mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-01 15:02:55 +00:00
Compare commits
15 Commits
softcommit
...
issue/526b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edcfa915ff | ||
|
|
96a4f503ec | ||
|
|
9df288b0c9 | ||
|
|
b7c2d0de97 | ||
|
|
62445e0ec8 | ||
|
|
a228825462 | ||
|
|
d3eabd14bc | ||
|
|
c967031d21 | ||
|
|
d823163d52 | ||
|
|
c4f59f202d | ||
|
|
acd29b535d | ||
|
|
2cd31bcda2 | ||
|
|
99870de55c | ||
|
|
cad2d91845 | ||
|
|
79f3cd6cf4 |
@@ -29,7 +29,7 @@ addons:
|
||||
matrix:
|
||||
include:
|
||||
# Android
|
||||
- env: TARGET=aarch64-linux-android DISABLE_TESTS
|
||||
- env: TARGET=aarch64-linux-android DISABLE_TESTS=1
|
||||
#- env: TARGET=arm-linux-androideabi DISABLE_TESTS=1
|
||||
#- env: TARGET=armv7-linux-androideabi DISABLE_TESTS=1
|
||||
#- env: TARGET=i686-linux-android DISABLE_TESTS=1
|
||||
@@ -68,6 +68,11 @@ cache: cargo
|
||||
before_cache:
|
||||
# Travis can't cache files that are not readable by "others"
|
||||
- chmod -R a+r $HOME/.cargo
|
||||
- find ./target/debug -type f -maxdepth 1 -delete
|
||||
- rm -f ./target/.rustc_info.json
|
||||
- rm -fr ./target/debug/{deps,.fingerprint}/tantivy*
|
||||
- rm -r target/debug/examples/
|
||||
- ls -1 examples/ | sed -e 's/\.rs$//' | xargs -I "{}" find target/* -name "*{}*" -type f -delete
|
||||
|
||||
#branches:
|
||||
# only:
|
||||
|
||||
41
CHANGELOG.md
41
CHANGELOG.md
@@ -1,3 +1,15 @@
|
||||
Tantivy 0.10.0
|
||||
====================
|
||||
|
||||
|
||||
Minor
|
||||
---------
|
||||
- Small simplification of the code.
|
||||
Calling .freq() or .doc() when .advance() has never
|
||||
on segment postings should panic from now on.
|
||||
- Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking.
|
||||
|
||||
|
||||
Tantivy 0.9.0
|
||||
=====================
|
||||
*0.9.0 index format is not compatible with the
|
||||
@@ -17,6 +29,35 @@ previous index format.*
|
||||
- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton)
|
||||
- SIMD linear search within blocks (@fulmicoton)
|
||||
|
||||
## How to update ?
|
||||
|
||||
tantivy 0.9 brought some API breaking change.
|
||||
To update from tantivy 0.8, you will need to go through the following steps.
|
||||
|
||||
- `schema::INT_INDEXED` and `schema::INT_STORED` should be replaced by `schema::INDEXED` and `schema::INT_STORED`.
|
||||
- The index now does not hold the pool of searcher anymore. You are required to create an intermediary object called
|
||||
`IndexReader` for this.
|
||||
|
||||
```rust
|
||||
// create the reader. You typically need to create 1 reader for the entire
|
||||
// lifetime of you program.
|
||||
let reader = index.reader()?;
|
||||
|
||||
// Acquire a searcher (previously `index.searcher()`) is now written:
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// With the default setting of the reader, you are not required to
|
||||
// call `index.load_searchers()` anymore.
|
||||
//
|
||||
// The IndexReader will pick up that change automatically, regardless
|
||||
// of whether the update was done in a different process or not.
|
||||
// If this behavior is not wanted, you can create your reader with
|
||||
// the `ReloadPolicy::Manual`, and manually decide when to reload the index
|
||||
// by calling `reader.reload()?`.
|
||||
|
||||
```
|
||||
|
||||
|
||||
Tantivy 0.8.2
|
||||
=====================
|
||||
Fixing build for x86_64 platforms. (#496)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.9.0"
|
||||
version = "0.10.0-dev"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -23,7 +23,7 @@ snap = {version="0.2"}
|
||||
atomicwrites = {version="0.2.2", optional=true}
|
||||
tempfile = "3.0"
|
||||
log = "0.4"
|
||||
combine = "3"
|
||||
combine = ">=3.6.0,<4.0.0"
|
||||
tempdir = "0.3"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
|
||||
@@ -13,7 +13,6 @@ pub use self::serialize::{BinarySerializable, FixedSize};
|
||||
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
|
||||
pub use byteorder::LittleEndian as Endianness;
|
||||
|
||||
|
||||
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
|
||||
///
|
||||
/// We do not allow segments with more than
|
||||
|
||||
@@ -205,16 +205,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
|
||||
/// `OnCommit` `ReloadPolicy` to work properly.
|
||||
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
|
||||
|
||||
/// Ensure that all volatile files reach are persisted (in directory where that makes sense.)
|
||||
///
|
||||
/// In order to make Near Real Time efficient, tantivy introduced the notion of soft_commit vs
|
||||
/// commit. Commit will call `.flush()`, while softcommit won't.
|
||||
///
|
||||
/// `meta.json` should be the last file to be flushed.
|
||||
fn flush(&self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// DirectoryClone
|
||||
|
||||
@@ -368,7 +368,7 @@ impl Drop for ReleaseLockFile {
|
||||
|
||||
/// This Write wraps a File, but has the specificity of
|
||||
/// call `sync_all` on flush.
|
||||
pub struct SafeFileWriter(File);
|
||||
struct SafeFileWriter(File);
|
||||
|
||||
impl SafeFileWriter {
|
||||
fn new(file: File) -> SafeFileWriter {
|
||||
|
||||
@@ -13,7 +13,6 @@ mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod read_only_source;
|
||||
mod watch_event_router;
|
||||
mod nrt_directory;
|
||||
|
||||
/// Errors specific to the directory module.
|
||||
pub mod error;
|
||||
|
||||
@@ -1,195 +0,0 @@
|
||||
use directory::Directory;
|
||||
use std::path::{PathBuf, Path};
|
||||
use directory::ReadOnlySource;
|
||||
use directory::error::OpenReadError;
|
||||
use directory::error::DeleteError;
|
||||
use std::io::{BufWriter, Cursor};
|
||||
use directory::SeekableWrite;
|
||||
use directory::error::OpenWriteError;
|
||||
use directory::WatchHandle;
|
||||
use directory::ram_directory::InnerRamDirectory;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Arc;
|
||||
use directory::WatchCallback;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::{Seek, Write};
|
||||
use directory::DirectoryClone;
|
||||
|
||||
|
||||
const BUFFER_LEN: usize = 1_000_000;
|
||||
|
||||
|
||||
pub enum NRTWriter {
|
||||
InRam {
|
||||
buffer: Cursor<Vec<u8>>,
|
||||
path: PathBuf,
|
||||
nrt_directory: NRTDirectory
|
||||
},
|
||||
UnderlyingFile(BufWriter<Box<SeekableWrite>>)
|
||||
}
|
||||
|
||||
impl NRTWriter {
|
||||
pub fn new(path: PathBuf, nrt_directory: NRTDirectory) -> NRTWriter {
|
||||
NRTWriter::InRam {
|
||||
buffer: Cursor::new(Vec::with_capacity(BUFFER_LEN)),
|
||||
path,
|
||||
nrt_directory,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Seek for NRTWriter {
|
||||
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
||||
match self {
|
||||
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||
buffer.seek(pos)
|
||||
}
|
||||
NRTWriter::UnderlyingFile(file) => {
|
||||
file.seek(pos)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for NRTWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.write_all(buf)?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
match self {
|
||||
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||
let mut cache_wlock = nrt_directory.cache.write().unwrap();
|
||||
cache_wlock.write(path.clone(), buffer.get_ref());
|
||||
Ok(())
|
||||
}
|
||||
NRTWriter::UnderlyingFile(file) => {
|
||||
file.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
|
||||
// Working around the borrow checker.
|
||||
let mut underlying_write_opt: Option<BufWriter<Box<SeekableWrite>>> = None;
|
||||
if let NRTWriter::InRam { buffer, path, nrt_directory } = self {
|
||||
if buffer.get_ref().len() + buf.len() > BUFFER_LEN {
|
||||
// We can't keep this in RAM. Let's move it to the underlying directory.
|
||||
underlying_write_opt = Some(nrt_directory.open_write(path)
|
||||
.map_err(|open_err| {
|
||||
io::Error::new(io::ErrorKind::Other, open_err)
|
||||
})?);
|
||||
|
||||
}
|
||||
}
|
||||
if let Some(underlying_write) = underlying_write_opt {
|
||||
*self = NRTWriter::UnderlyingFile(underlying_write);
|
||||
}
|
||||
match self {
|
||||
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||
assert!(buffer.get_ref().len() + buf.len() <= BUFFER_LEN);
|
||||
buffer.write_all(buf)
|
||||
}
|
||||
NRTWriter::UnderlyingFile(file) => {
|
||||
file.write_all(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NRTDirectory {
|
||||
underlying: Box<Directory>,
|
||||
cache: Arc<RwLock<InnerRamDirectory>>,
|
||||
}
|
||||
|
||||
|
||||
impl Clone for NRTDirectory {
|
||||
fn clone(&self) -> Self {
|
||||
NRTDirectory {
|
||||
underlying: self.underlying.box_clone(),
|
||||
cache: self.cache.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NRTDirectory {
|
||||
fn wrap(underlying: Box<Directory>) -> NRTDirectory {
|
||||
NRTDirectory {
|
||||
underlying,
|
||||
cache: Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NRTDirectory {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "NRTDirectory({:?})", self.underlying)
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for NRTDirectory {
|
||||
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
|
||||
// We explicitly release the lock, to prevent a panic on the underlying directory
|
||||
// to poison the lock.
|
||||
//
|
||||
// File can only go from cache to underlying so the result does not lead to
|
||||
// any inconsistency.
|
||||
{
|
||||
let mut cache_wlock = self.cache.write().unwrap();
|
||||
if cache_wlock.exists(path) {
|
||||
return cache_wlock.delete(path);
|
||||
}
|
||||
}
|
||||
self.underlying.delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
// We explicitly release the lock, to prevent a panic on the underlying directory
|
||||
// to poison the lock.
|
||||
//
|
||||
// File can only go from cache to underlying so the result does not lead to
|
||||
// any inconsistency.
|
||||
{
|
||||
let rlock_cache = self.cache.read().unwrap();
|
||||
if rlock_cache.exists(path) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
self.underlying.exists(path)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<BufWriter<Box<SeekableWrite>>, OpenWriteError> {
|
||||
let mut cache_wlock = self.cache.write().unwrap();
|
||||
// TODO might poison our lock. I don't know have a sound solution yet.
|
||||
let path_buf = path.to_owned();
|
||||
if self.underlying.exists(path) {
|
||||
return Err(OpenWriteError::FileAlreadyExists(path_buf));
|
||||
}
|
||||
let exists = cache_wlock.write(path_buf.clone(), &[]);
|
||||
// force the creation of the file to mimic the MMap directory.
|
||||
if exists {
|
||||
Err(OpenWriteError::FileAlreadyExists(path_buf))
|
||||
} else {
|
||||
let vec_writer = NRTWriter::new(path_buf.clone(), self.clone());
|
||||
Ok(BufWriter::new(Box::new(vec_writer)))
|
||||
}
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
self.underlying.atomic_read(path)
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
self.underlying.atomic_write(path, data)
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
|
||||
self.underlying.watch(watch_callback)
|
||||
}
|
||||
}
|
||||
@@ -71,36 +71,36 @@ impl Write for VecWriter {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct InnerRamDirectory {
|
||||
struct InnerDirectory {
|
||||
fs: HashMap<PathBuf, ReadOnlySource>,
|
||||
watch_router: WatchCallbackList,
|
||||
}
|
||||
|
||||
impl InnerRamDirectory {
|
||||
pub fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
||||
impl InnerDirectory {
|
||||
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
||||
let data = ReadOnlySource::new(Vec::from(data));
|
||||
self.fs.insert(path, data).is_some()
|
||||
}
|
||||
|
||||
pub fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||
self.fs
|
||||
.get(path)
|
||||
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
|
||||
.map(|el| el.clone())
|
||||
}
|
||||
|
||||
pub fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
match self.fs.remove(path) {
|
||||
Some(_) => Ok(()),
|
||||
None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exists(&self, path: &Path) -> bool {
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.contains_key(path)
|
||||
}
|
||||
|
||||
pub fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
|
||||
fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
|
||||
self.watch_router.subscribe(watch_handle)
|
||||
}
|
||||
}
|
||||
@@ -118,7 +118,7 @@ impl fmt::Debug for RAMDirectory {
|
||||
///
|
||||
#[derive(Clone, Default)]
|
||||
pub struct RAMDirectory {
|
||||
fs: Arc<RwLock<InnerRamDirectory>>,
|
||||
fs: Arc<RwLock<InnerDirectory>>,
|
||||
}
|
||||
|
||||
impl RAMDirectory {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use common::BitSet;
|
||||
use fastfield::DeleteBitSet;
|
||||
use std::borrow::Borrow;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::cmp::Ordering;
|
||||
@@ -95,9 +96,23 @@ pub trait DocSet {
|
||||
}
|
||||
|
||||
/// Returns the number documents matching.
|
||||
///
|
||||
/// Calling this method consumes the `DocSet`.
|
||||
fn count(&mut self) -> u32 {
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
let mut count = 0u32;
|
||||
while self.advance() {
|
||||
if !delete_bitset.is_deleted(self.doc()) {
|
||||
count += 1u32;
|
||||
}
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
/// Returns the count of documents, deleted or not.
|
||||
/// Calling this method consumes the `DocSet`.
|
||||
///
|
||||
/// Of course, the result is an upper bound of the result
|
||||
/// given by `count()`.
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
let mut count = 0u32;
|
||||
while self.advance() {
|
||||
count += 1u32;
|
||||
@@ -127,9 +142,14 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
unboxed.size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self) -> u32 {
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
let unboxed: &mut TDocSet = self.borrow_mut();
|
||||
unboxed.count()
|
||||
unboxed.count(delete_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
let unboxed: &mut TDocSet = self.borrow_mut();
|
||||
unboxed.count_including_deleted()
|
||||
}
|
||||
|
||||
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
|
||||
|
||||
@@ -179,11 +179,6 @@ pub struct DeleteCursor {
|
||||
}
|
||||
|
||||
impl DeleteCursor {
|
||||
|
||||
pub fn empty() -> DeleteCursor {
|
||||
DeleteQueue::new().cursor()
|
||||
}
|
||||
|
||||
/// Skips operations and position it so that
|
||||
/// - either all of the delete operation currently in the
|
||||
/// queue are consume and the next get will return None.
|
||||
|
||||
@@ -259,7 +259,7 @@ pub fn advance_deletes(
|
||||
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
||||
}
|
||||
}
|
||||
segment_entry.set_meta(target_opstamp, segment.meta().clone());
|
||||
segment_entry.set_meta(segment.meta().clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -326,12 +326,7 @@ fn index_documents(
|
||||
// to even open the segment.
|
||||
None
|
||||
};
|
||||
let segment_entry = SegmentEntry::new(
|
||||
segment_meta,
|
||||
delete_cursor,
|
||||
delete_bitset_opt,
|
||||
last_docstamp,
|
||||
);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
|
||||
Ok(segment_updater.add_segment(generation, segment_entry))
|
||||
}
|
||||
|
||||
@@ -366,9 +361,9 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn add_segment(&mut self, segment_meta: SegmentMeta, opstamp: u64) {
|
||||
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
|
||||
let delete_cursor = self.delete_queue.cursor();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None, opstamp);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
|
||||
self.segment_updater
|
||||
.add_segment(self.generation, segment_entry);
|
||||
}
|
||||
@@ -532,7 +527,7 @@ impl IndexWriter {
|
||||
//
|
||||
// This will reach an end as the only document_sender
|
||||
// was dropped with the index_writer.
|
||||
for _ in document_receiver.iter() {}
|
||||
for _ in document_receiver.clone() {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -559,16 +554,6 @@ impl IndexWriter {
|
||||
/// using this API.
|
||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
|
||||
info!("Preparing commit");
|
||||
self.prepare_commit_internal(false)
|
||||
}
|
||||
|
||||
pub fn prepare_commit_soft(&mut self) -> Result<PreparedCommit> {
|
||||
info!("Preparing soft commit");
|
||||
self.prepare_commit_internal(true)
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result<PreparedCommit> {
|
||||
// Here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
@@ -591,13 +576,13 @@ impl IndexWriter {
|
||||
let indexing_worker_result = worker_handle
|
||||
.join()
|
||||
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
||||
// add a new worker for the next generation, whether the worker failed or not.
|
||||
self.add_indexing_worker()?;
|
||||
indexing_worker_result?;
|
||||
// add a new worker for the next generation.
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft);
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||
info!("Prepared commit {}", commit_opstamp);
|
||||
Ok(prepared_commit)
|
||||
}
|
||||
@@ -620,11 +605,6 @@ impl IndexWriter {
|
||||
self.prepare_commit()?.commit()
|
||||
}
|
||||
|
||||
|
||||
pub fn soft_commit(&mut self) -> Result<u64> {
|
||||
self.prepare_commit_soft()?.commit()
|
||||
}
|
||||
|
||||
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
||||
&self.segment_updater
|
||||
}
|
||||
@@ -752,7 +732,6 @@ mod tests {
|
||||
use Index;
|
||||
use ReloadPolicy;
|
||||
use Term;
|
||||
use IndexReader;
|
||||
|
||||
#[test]
|
||||
fn test_operations_group() {
|
||||
@@ -886,13 +865,6 @@ mod tests {
|
||||
let _index_writer_two = index.writer(3_000_000).unwrap();
|
||||
}
|
||||
|
||||
fn num_docs_containing_text(reader: &IndexReader, term: &str) -> u64 {
|
||||
let searcher = reader.searcher();
|
||||
let text_field = reader.schema().get_field("text").unwrap();
|
||||
let term = Term::from_field_text(text_field, term);
|
||||
searcher.doc_freq(&term)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_commit_and_rollback() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
@@ -909,12 +881,9 @@ mod tests {
|
||||
searcher.doc_freq(&term)
|
||||
};
|
||||
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
|
||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "a"), 0);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a"));
|
||||
index_writer.rollback().unwrap();
|
||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||
@@ -933,35 +902,6 @@ mod tests {
|
||||
reader.searcher();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_softcommit_and_rollback() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let reader = index.reader().unwrap();
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a"));
|
||||
index_writer.rollback().unwrap();
|
||||
|
||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||
{
|
||||
index_writer.add_document(doc!(text_field=>"b"));
|
||||
index_writer.add_document(doc!(text_field=>"c"));
|
||||
}
|
||||
assert!(index_writer.soft_commit().is_ok());
|
||||
reader.reload().unwrap(); // we need to load soft committed stuff.
|
||||
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "b"), 1u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "c"), 1u64);
|
||||
index_writer.rollback().unwrap();
|
||||
reader.reload().unwrap();
|
||||
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "b"), 0u64);
|
||||
assert_eq!(num_docs_containing_text(&reader, "c"), 0u64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_merges() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
@@ -995,7 +935,7 @@ mod tests {
|
||||
|
||||
reader.reload().unwrap();
|
||||
|
||||
assert_eq!(num_docs_containing_text(&reader, "a"), 200);
|
||||
assert_eq!(num_docs_containing("a"), 200);
|
||||
assert!(index.searchable_segments().unwrap().len() < 8);
|
||||
}
|
||||
}
|
||||
@@ -1038,7 +978,7 @@ mod tests {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let reader = index.reader();
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||
|
||||
@@ -6,20 +6,14 @@ pub struct PreparedCommit<'a> {
|
||||
index_writer: &'a mut IndexWriter,
|
||||
payload: Option<String>,
|
||||
opstamp: u64,
|
||||
soft: bool,
|
||||
}
|
||||
|
||||
impl<'a> PreparedCommit<'a> {
|
||||
pub(crate) fn new(
|
||||
index_writer: &'a mut IndexWriter,
|
||||
opstamp: u64,
|
||||
soft: bool,
|
||||
) -> PreparedCommit {
|
||||
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
|
||||
PreparedCommit {
|
||||
index_writer,
|
||||
payload: None,
|
||||
opstamp,
|
||||
soft,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +33,7 @@ impl<'a> PreparedCommit<'a> {
|
||||
info!("committing {}", self.opstamp);
|
||||
self.index_writer
|
||||
.segment_updater()
|
||||
.commit(self.opstamp, self.payload, self.soft)?;
|
||||
.commit(self.opstamp, self.payload)?;
|
||||
Ok(self.opstamp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ pub struct SegmentEntry {
|
||||
meta: SegmentMeta,
|
||||
delete_bitset: Option<BitSet>,
|
||||
delete_cursor: DeleteCursor,
|
||||
opstamp: u64,
|
||||
}
|
||||
|
||||
impl SegmentEntry {
|
||||
@@ -31,20 +30,14 @@ impl SegmentEntry {
|
||||
segment_meta: SegmentMeta,
|
||||
delete_cursor: DeleteCursor,
|
||||
delete_bitset: Option<BitSet>,
|
||||
opstamp: u64,
|
||||
) -> SegmentEntry {
|
||||
SegmentEntry {
|
||||
meta: segment_meta,
|
||||
delete_bitset,
|
||||
delete_cursor,
|
||||
opstamp,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
self.opstamp
|
||||
}
|
||||
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
@@ -53,8 +46,7 @@ impl SegmentEntry {
|
||||
}
|
||||
|
||||
/// Set the `SegmentMeta` for this segment.
|
||||
pub fn set_meta(&mut self, opstamp: u64, segment_meta: SegmentMeta) {
|
||||
self.opstamp = opstamp;
|
||||
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
|
||||
self.meta = segment_meta;
|
||||
}
|
||||
|
||||
|
||||
@@ -11,47 +11,11 @@ use std::path::PathBuf;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
||||
use Result as TantivyResult;
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Provides a read-only view of the available segments.
|
||||
#[derive(Clone)]
|
||||
pub struct AvailableSegments {
|
||||
registers: Arc<RwLock<SegmentRegisters>>,
|
||||
}
|
||||
|
||||
impl AvailableSegments {
|
||||
pub fn committed(&self) -> Vec<SegmentMeta> {
|
||||
self.registers
|
||||
.read()
|
||||
.unwrap()
|
||||
.committed
|
||||
.segment_metas()
|
||||
}
|
||||
|
||||
pub fn soft_committed(&self) -> Vec<SegmentMeta> {
|
||||
self.registers
|
||||
.read()
|
||||
.unwrap()
|
||||
.soft_committed
|
||||
.segment_metas()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Default)]
|
||||
struct SegmentRegisters {
|
||||
uncommitted: HashMap<SegmentId, SegmentEntry>,
|
||||
uncommitted: SegmentRegister,
|
||||
committed: SegmentRegister,
|
||||
/// soft commits can advance committed segment to a future delete
|
||||
/// opstamp.
|
||||
///
|
||||
/// In that case the same `SegmentId` can appear in both `committed`
|
||||
/// and in `committed_in_the_future`.
|
||||
///
|
||||
/// We do not consider these segments for merges.
|
||||
soft_committed: SegmentRegister,
|
||||
/// `DeleteCursor`, positionned on the soft commit.
|
||||
delete_cursor: DeleteCursor,
|
||||
}
|
||||
|
||||
/// The segment manager stores the list of segments
|
||||
@@ -59,8 +23,9 @@ struct SegmentRegisters {
|
||||
///
|
||||
/// It guarantees the atomicity of the
|
||||
/// changes (merges especially)
|
||||
#[derive(Default)]
|
||||
pub struct SegmentManager {
|
||||
registers: Arc<RwLock<SegmentRegisters>>
|
||||
registers: RwLock<SegmentRegisters>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentManager {
|
||||
@@ -81,17 +46,11 @@ pub fn get_mergeable_segments(
|
||||
let registers_lock = segment_manager.read();
|
||||
(
|
||||
registers_lock
|
||||
.soft_committed
|
||||
.committed
|
||||
.get_mergeable_segments(in_merge_segment_ids),
|
||||
registers_lock
|
||||
.uncommitted
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.meta())
|
||||
.filter(|segment_meta| {
|
||||
!in_merge_segment_ids.contains(&segment_meta.id())
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
.get_mergeable_segments(in_merge_segment_ids),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -99,22 +58,21 @@ impl SegmentManager {
|
||||
pub fn from_segments(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
opstamp: u64,
|
||||
) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: Arc::new(RwLock::new(SegmentRegisters {
|
||||
uncommitted: HashMap::default(),
|
||||
committed: SegmentRegister::new(segment_metas.clone(), opstamp),
|
||||
soft_committed: SegmentRegister::new(segment_metas, opstamp),
|
||||
delete_cursor: delete_cursor.clone(),
|
||||
}))
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available_segments_view(&self) -> AvailableSegments {
|
||||
AvailableSegments {
|
||||
registers: self.registers.clone()
|
||||
}
|
||||
/// Returns all of the segment entries (committed or uncommitted)
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
let registers_lock = self.read();
|
||||
let mut segment_entries = registers_lock.uncommitted.segment_entries();
|
||||
segment_entries.extend(registers_lock.committed.segment_entries());
|
||||
segment_entries
|
||||
}
|
||||
|
||||
/// List the files that are useful to the index.
|
||||
@@ -150,76 +108,44 @@ impl SegmentManager {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock
|
||||
.committed
|
||||
.segment_metas()
|
||||
.segment_entries()
|
||||
.iter()
|
||||
.filter(|segment_meta| segment_meta.num_docs() == 0)
|
||||
.for_each(|segment_meta| {
|
||||
.filter(|segment| segment.meta().num_docs() == 0)
|
||||
.for_each(|segment| {
|
||||
registers_lock
|
||||
.committed
|
||||
.remove_segment(&segment_meta.id())
|
||||
});
|
||||
registers_lock
|
||||
.soft_committed
|
||||
.segment_metas()
|
||||
.iter()
|
||||
.filter(|segment_meta| segment_meta.num_docs() == 0)
|
||||
.for_each(|segment_meta| {
|
||||
registers_lock
|
||||
.committed
|
||||
.remove_segment(&segment_meta.id())
|
||||
.remove_segment(&segment.segment_id())
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns all of the segment entries (soft committed or uncommitted)
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
let registers_lock = self.read();
|
||||
let mut segment_entries: Vec<SegmentEntry > = registers_lock.uncommitted.values().cloned().collect();
|
||||
segment_entries.extend(registers_lock.soft_committed.segment_entries(®isters_lock.delete_cursor).into_iter());
|
||||
segment_entries
|
||||
}
|
||||
|
||||
|
||||
pub fn commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.committed.clear();
|
||||
registers_lock.uncommitted.clear();
|
||||
registers_lock
|
||||
.committed
|
||||
.set_commit(opstamp, segment_entries.clone());
|
||||
registers_lock
|
||||
.soft_committed
|
||||
.set_commit(opstamp, segment_entries);
|
||||
registers_lock.delete_cursor.skip_to(opstamp);
|
||||
for segment_entry in segment_entries {
|
||||
registers_lock.committed.add_segment_entry(segment_entry);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn soft_commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.uncommitted.clear();
|
||||
registers_lock
|
||||
.soft_committed
|
||||
.set_commit(opstamp, segment_entries);
|
||||
registers_lock.delete_cursor.skip_to(opstamp);
|
||||
}
|
||||
|
||||
/// Gets the list of segment_entries associated to a list of `segment_ids`.
|
||||
/// This method is used when starting a merge operations.
|
||||
/// Marks a list of segments as in merge.
|
||||
///
|
||||
/// Returns an error if some segments are missing, or if
|
||||
/// the `segment_ids` are not either all soft_committed or all
|
||||
/// the `segment_ids` are not either all committed or all
|
||||
/// uncommitted.
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
|
||||
let registers_lock = self.read();
|
||||
let mut segment_entries = vec![];
|
||||
if segment_ids.iter().all(|segment_id| registers_lock.uncommitted.contains_key(segment_id)) {
|
||||
if registers_lock.uncommitted.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
let segment_entry = registers_lock.uncommitted
|
||||
.get(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry.clone());
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
} else if registers_lock.soft_committed.contains_all(segment_ids) {
|
||||
} else if registers_lock.committed.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
let segment_entry = registers_lock.soft_committed
|
||||
.get(segment_id, ®isters_lock.delete_cursor)
|
||||
let segment_entry = registers_lock.committed
|
||||
.get(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
@@ -234,32 +160,35 @@ impl SegmentManager {
|
||||
|
||||
pub fn add_segment(&self, segment_entry: SegmentEntry) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock
|
||||
.uncommitted
|
||||
.insert(segment_entry.segment_id(), segment_entry);
|
||||
registers_lock.uncommitted.add_segment_entry(segment_entry);
|
||||
}
|
||||
|
||||
pub fn end_merge(
|
||||
&self,
|
||||
before_merge_segment_ids: &[SegmentId],
|
||||
after_merge_segment_entry: SegmentEntry
|
||||
after_merge_segment_entry: SegmentEntry,
|
||||
) {
|
||||
let mut registers_lock = self.write();
|
||||
|
||||
if before_merge_segment_ids.iter().all(|seg_id|
|
||||
registers_lock
|
||||
let target_register: &mut SegmentRegister = {
|
||||
if registers_lock
|
||||
.uncommitted
|
||||
.contains_key(seg_id))
|
||||
{
|
||||
for segment_id in before_merge_segment_ids {
|
||||
registers_lock.uncommitted.remove(&segment_id);
|
||||
.contains_all(before_merge_segment_ids)
|
||||
{
|
||||
&mut registers_lock.uncommitted
|
||||
} else if registers_lock
|
||||
.committed
|
||||
.contains_all(before_merge_segment_ids)
|
||||
{
|
||||
&mut registers_lock.committed
|
||||
} else {
|
||||
warn!("couldn't find segment in SegmentManager");
|
||||
return;
|
||||
}
|
||||
registers_lock.uncommitted.insert(after_merge_segment_entry.segment_id(),
|
||||
after_merge_segment_entry);
|
||||
} else {
|
||||
registers_lock.committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry);
|
||||
registers_lock.soft_committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry)
|
||||
};
|
||||
for segment_id in before_merge_segment_ids {
|
||||
target_register.remove_segment(segment_id);
|
||||
}
|
||||
target_register.add_segment_entry(after_merge_segment_entry);
|
||||
}
|
||||
|
||||
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
|
||||
|
||||
@@ -16,8 +16,7 @@ use std::fmt::{self, Debug, Formatter};
|
||||
/// merge candidates.
|
||||
#[derive(Default)]
|
||||
pub struct SegmentRegister {
|
||||
segment_states: HashMap<SegmentId, SegmentMeta>,
|
||||
opstamp_constraint: u64,
|
||||
segment_states: HashMap<SegmentId, SegmentEntry>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentRegister {
|
||||
@@ -42,28 +41,23 @@ impl SegmentRegister {
|
||||
) -> Vec<SegmentMeta> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.filter(|segment_meta| !in_merge_segment_ids.contains(&segment_meta.id()))
|
||||
.cloned()
|
||||
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
self.segment_states.values().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
|
||||
let mut segment_metas: Vec<SegmentMeta> = self
|
||||
let mut segment_ids: Vec<SegmentMeta> = self
|
||||
.segment_states
|
||||
.values()
|
||||
.cloned()
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect();
|
||||
segment_metas.sort_by_key(|meta| meta.id());
|
||||
segment_metas
|
||||
}
|
||||
|
||||
pub fn segment_entries(&self, delete_cursor: &DeleteCursor) -> Vec<SegmentEntry> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.map(|segment_meta| {
|
||||
SegmentEntry::new(segment_meta.clone(), delete_cursor.clone(), None, self.opstamp_constraint)
|
||||
})
|
||||
.collect()
|
||||
segment_ids.sort_by_key(|meta| meta.id());
|
||||
segment_ids
|
||||
}
|
||||
|
||||
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
|
||||
@@ -72,77 +66,27 @@ impl SegmentRegister {
|
||||
.all(|segment_id| self.segment_states.contains_key(segment_id))
|
||||
}
|
||||
|
||||
pub fn receive_merge(&mut self,
|
||||
before_merge_segment_ids: &[SegmentId],
|
||||
after_merge_segment_entry: &SegmentEntry) {
|
||||
if after_merge_segment_entry.opstamp() != self.opstamp_constraint {
|
||||
return;
|
||||
}
|
||||
if !self.contains_all(before_merge_segment_ids) {
|
||||
return;
|
||||
}
|
||||
for segment_id in before_merge_segment_ids {
|
||||
self.segment_states.remove(segment_id);
|
||||
}
|
||||
self.register_segment_entry(after_merge_segment_entry.clone());
|
||||
}
|
||||
|
||||
/// Registers a `SegmentEntry`.
|
||||
///
|
||||
/// If a segment entry associated to this `SegmentId` is already there,
|
||||
/// override it with the new `SegmentEntry`.
|
||||
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
|
||||
if self.opstamp_constraint != segment_entry.opstamp() {
|
||||
panic!(format!(
|
||||
"Invalid segment. Expect opstamp {}, got {}.",
|
||||
self.opstamp_constraint,
|
||||
segment_entry.opstamp()
|
||||
));
|
||||
}
|
||||
if segment_entry.meta().num_docs() == 0 {
|
||||
return;
|
||||
}
|
||||
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
|
||||
let segment_id = segment_entry.segment_id();
|
||||
// Check that we are ok with deletes.
|
||||
self.segment_states.insert(segment_id, segment_entry.meta().clone());
|
||||
}
|
||||
|
||||
pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||
self.segment_states.clear();
|
||||
self.opstamp_constraint = opstamp;
|
||||
for segment_entry in segment_entries {
|
||||
self.register_segment_entry(segment_entry);
|
||||
}
|
||||
self.segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
|
||||
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
|
||||
self.segment_states.remove(&segment_id);
|
||||
self.segment_states.remove(segment_id);
|
||||
}
|
||||
|
||||
pub fn get(&self, segment_id: &SegmentId, delete_cursor: &DeleteCursor) -> Option<SegmentEntry> {
|
||||
self.segment_states
|
||||
.get(&segment_id)
|
||||
.map(|segment_meta|
|
||||
SegmentEntry::new(
|
||||
segment_meta.clone(),
|
||||
delete_cursor.clone(),
|
||||
None,
|
||||
self.opstamp_constraint
|
||||
))
|
||||
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
opstamp: u64,
|
||||
) -> SegmentRegister {
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
let mut segment_states = HashMap::new();
|
||||
for segment_meta in segment_metas {
|
||||
segment_states.insert(segment_meta.id(), segment_meta);
|
||||
}
|
||||
SegmentRegister {
|
||||
segment_states,
|
||||
opstamp_constraint: opstamp,
|
||||
let segment_id = segment_meta.id();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
|
||||
segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,22 +115,22 @@ mod tests {
|
||||
let segment_id_merged = SegmentId::generate_random();
|
||||
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_a, 1u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
|
||||
segment_register.register_segment_entry(segment_entry);
|
||||
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_b, 2u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
|
||||
segment_register.register_segment_entry(segment_entry);
|
||||
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 3u32);
|
||||
let segment_entry =
|
||||
SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None, 0u64);
|
||||
segment_register.receive_merge(&[segment_id_a, segment_id_b], &segment_entry);
|
||||
segment_register.register_segment_entry(segment_entry);
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ fn perform_merge(
|
||||
|
||||
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
|
||||
|
||||
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None, target_opstamp);
|
||||
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
|
||||
Ok(after_merge_segment_entry)
|
||||
}
|
||||
|
||||
@@ -155,11 +155,8 @@ impl SegmentUpdater {
|
||||
stamper: Stamper,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> Result<SegmentUpdater> {
|
||||
|
||||
let index_meta = index.load_metas()?;
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let opstamp = index_meta.opstamp;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor, opstamp);
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let pool = CpuPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
@@ -283,30 +280,14 @@ impl SegmentUpdater {
|
||||
.garbage_collect(|| self.0.segment_manager.list_files());
|
||||
}
|
||||
|
||||
pub fn commit(&self, opstamp: u64, payload: Option<String>, soft: bool) -> Result<()> {
|
||||
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
|
||||
self.run_async(move |segment_updater| {
|
||||
if segment_updater.is_alive() {
|
||||
let segment_entries = segment_updater
|
||||
.purge_deletes(opstamp)
|
||||
.expect("Failed purge deletes");
|
||||
if soft {
|
||||
// Soft commit.
|
||||
//
|
||||
// The list `segment_entries` above is what we might want to use as searchable
|
||||
// segment. However, we do not want to mark them as committed, and we want
|
||||
// to keep the current set of committed segment.
|
||||
segment_updater.0.segment_manager.soft_commit(opstamp, segment_entries);
|
||||
// ... We do not save the meta file.
|
||||
} else {
|
||||
// Hard_commit. We register the new segment entries as committed.
|
||||
segment_updater
|
||||
.0
|
||||
.segment_manager
|
||||
.commit(opstamp, segment_entries);
|
||||
// TODO error handling.
|
||||
segment_updater.save_metas(opstamp, payload);
|
||||
segment_updater.0.index.directory().flush().unwrap();
|
||||
}
|
||||
segment_updater.0.segment_manager.commit(segment_entries);
|
||||
segment_updater.save_metas(opstamp, payload);
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
segment_updater.consider_merge_options();
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use postings::compression::AlignedBuffer;
|
||||
|
||||
/// This modules define the logic used to search for a doc in a given
|
||||
/// block. (at most 128 docs)
|
||||
///
|
||||
@@ -6,7 +8,7 @@
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
mod sse2 {
|
||||
use postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
|
||||
use std::arch::x86_64::__m128i as DataType;
|
||||
use std::arch::x86_64::_mm_add_epi32 as op_add;
|
||||
use std::arch::x86_64::_mm_cmplt_epi32 as op_lt;
|
||||
@@ -23,9 +25,9 @@ mod sse2 {
|
||||
///
|
||||
/// There is no early exit here. We simply count the
|
||||
/// number of elements that are `< target`.
|
||||
pub fn linear_search_sse2_128(arr: &[u32], target: u32) -> usize {
|
||||
pub(crate) fn linear_search_sse2_128(arr: &AlignedBuffer, target: u32) -> usize {
|
||||
unsafe {
|
||||
let ptr = arr.as_ptr() as *const DataType;
|
||||
let ptr = arr as *const AlignedBuffer as *const DataType;
|
||||
let vkey = set1(target as i32);
|
||||
let mut cnt = set0();
|
||||
// We work over 4 `__m128i` at a time.
|
||||
@@ -47,14 +49,16 @@ mod sse2 {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::linear_search_sse2_128;
|
||||
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
|
||||
|
||||
#[test]
|
||||
fn test_linear_search_sse2_128_u32() {
|
||||
for i in 0..23 {
|
||||
dbg!(i);
|
||||
let arr: Vec<u32> = (0..128).map(|el| el * 2 + 1 << 18).collect();
|
||||
assert_eq!(linear_search_sse2_128(&arr, arr[64] + 1), 65);
|
||||
let mut block = [0u32; COMPRESSION_BLOCK_SIZE];
|
||||
for el in 0u32..128u32 {
|
||||
block[el as usize] = el * 2 + 1 << 18;
|
||||
}
|
||||
let target = block[64] + 1;
|
||||
assert_eq!(linear_search_sse2_128(&AlignedBuffer(block), target), 65);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,17 +131,21 @@ impl BlockSearcher {
|
||||
/// then we use a different implementation that does an exhaustive linear search over
|
||||
/// the full block whenever the block is full (`len == 128`). It is surprisingly faster, most likely because of the lack
|
||||
/// of branch.
|
||||
pub fn search_in_block(&self, block_docs: &[u32], start: usize, target: u32) -> usize {
|
||||
pub(crate) fn search_in_block(
|
||||
self,
|
||||
block_docs: &AlignedBuffer,
|
||||
len: usize,
|
||||
start: usize,
|
||||
target: u32,
|
||||
) -> usize {
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
{
|
||||
use postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
if *self == BlockSearcher::SSE2 {
|
||||
if block_docs.len() == COMPRESSION_BLOCK_SIZE {
|
||||
return sse2::linear_search_sse2_128(block_docs, target);
|
||||
}
|
||||
if self == BlockSearcher::SSE2 && len == COMPRESSION_BLOCK_SIZE {
|
||||
return sse2::linear_search_sse2_128(block_docs, target);
|
||||
}
|
||||
}
|
||||
start + galloping(&block_docs[start..], target)
|
||||
start + galloping(&block_docs.0[start..len], target)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,6 +166,7 @@ mod tests {
|
||||
use super::exponential_search;
|
||||
use super::linear_search;
|
||||
use super::BlockSearcher;
|
||||
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
|
||||
|
||||
#[test]
|
||||
fn test_linear_search() {
|
||||
@@ -186,8 +195,19 @@ mod tests {
|
||||
|
||||
fn util_test_search_in_block(block_searcher: BlockSearcher, block: &[u32], target: u32) {
|
||||
let cursor = search_in_block_trivial_but_slow(block, target);
|
||||
assert!(block.len() < COMPRESSION_BLOCK_SIZE);
|
||||
let mut output_buffer = [u32::max_value(); COMPRESSION_BLOCK_SIZE];
|
||||
output_buffer[..block.len()].copy_from_slice(block);
|
||||
for i in 0..cursor {
|
||||
assert_eq!(block_searcher.search_in_block(block, i, target), cursor);
|
||||
assert_eq!(
|
||||
block_searcher.search_in_block(
|
||||
&AlignedBuffer(output_buffer),
|
||||
block.len(),
|
||||
i,
|
||||
target
|
||||
),
|
||||
cursor
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,11 +46,11 @@ impl BlockEncoder {
|
||||
/// We ensure that the OutputBuffer is align on 128 bits
|
||||
/// in order to run SSE2 linear search on it.
|
||||
#[repr(align(128))]
|
||||
struct OutputBuffer([u32; COMPRESSION_BLOCK_SIZE + 1]);
|
||||
pub(crate) struct AlignedBuffer(pub [u32; COMPRESSION_BLOCK_SIZE]);
|
||||
|
||||
pub struct BlockDecoder {
|
||||
bitpacker: BitPacker4x,
|
||||
output: OutputBuffer,
|
||||
output: AlignedBuffer,
|
||||
pub output_len: usize,
|
||||
}
|
||||
|
||||
@@ -60,11 +60,9 @@ impl BlockDecoder {
|
||||
}
|
||||
|
||||
pub fn with_val(val: u32) -> BlockDecoder {
|
||||
let mut output = [val; COMPRESSION_BLOCK_SIZE + 1];
|
||||
output[COMPRESSION_BLOCK_SIZE] = 0u32;
|
||||
BlockDecoder {
|
||||
bitpacker: BitPacker4x::new(),
|
||||
output: OutputBuffer(output),
|
||||
output: AlignedBuffer([val; COMPRESSION_BLOCK_SIZE]),
|
||||
output_len: 0,
|
||||
}
|
||||
}
|
||||
@@ -91,6 +89,11 @@ impl BlockDecoder {
|
||||
&self.output.0[..self.output_len]
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn output_aligned(&self) -> (&AlignedBuffer, usize) {
|
||||
(&self.output, self.output_len)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn output(&self, idx: usize) -> u32 {
|
||||
self.output.0[idx]
|
||||
|
||||
@@ -55,13 +55,15 @@ pub mod tests {
|
||||
use fieldnorm::FieldNormReader;
|
||||
use indexer::operation::AddOperation;
|
||||
use indexer::SegmentWriter;
|
||||
use merge_policy::NoMergePolicy;
|
||||
use query::Scorer;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use schema::Field;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
|
||||
use schema::{Field, TextOptions};
|
||||
use schema::{IndexRecordOption, TextFieldIndexing};
|
||||
use std::iter;
|
||||
use tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
|
||||
use DocId;
|
||||
use Score;
|
||||
|
||||
@@ -160,6 +162,52 @@ pub mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_drop_token_that_are_too_long() {
|
||||
let ok_token_text: String = iter::repeat('A').take(MAX_TOKEN_LEN).collect();
|
||||
let mut exceeding_token_text: String = iter::repeat('A').take(MAX_TOKEN_LEN + 1).collect();
|
||||
exceeding_token_text.push_str(" hello");
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_options = TextOptions::default().set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
.set_index_option(IndexRecordOption::WithFreqsAndPositions)
|
||||
.set_tokenizer("simple_no_truncation"),
|
||||
);
|
||||
let text_field = schema_builder.add_text_field("text", text_options);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
index
|
||||
.tokenizers()
|
||||
.register("simple_no_truncation", SimpleTokenizer);
|
||||
let reader = index.reader().unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
{
|
||||
index_writer.add_document(doc!(text_field=>exceeding_token_text));
|
||||
index_writer.commit().unwrap();
|
||||
reader.reload().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(0u32);
|
||||
let inverted_index = segment_reader.inverted_index(text_field);
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert_eq!(&bytes, b"hello");
|
||||
}
|
||||
{
|
||||
index_writer.add_document(doc!(text_field=>ok_token_text.clone()));
|
||||
index_writer.commit().unwrap();
|
||||
reader.reload().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(1u32);
|
||||
let inverted_index = segment_reader.inverted_index(text_field);
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert_eq!(&bytes[..], ok_token_text.as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_position_and_fieldnorm1() {
|
||||
let mut positions = Vec::new();
|
||||
|
||||
@@ -12,8 +12,8 @@ use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::DerefMut;
|
||||
use termdict::TermOrdinal;
|
||||
use tokenizer::Token;
|
||||
use tokenizer::TokenStream;
|
||||
use tokenizer::{Token, MAX_TOKEN_LEN};
|
||||
use DocId;
|
||||
use Result;
|
||||
|
||||
@@ -210,8 +210,18 @@ pub trait PostingsWriter {
|
||||
) -> u32 {
|
||||
let mut term = Term::for_field(field);
|
||||
let mut sink = |token: &Token| {
|
||||
term.set_text(token.text.as_str());
|
||||
self.subscribe(term_index, doc_id, token.position as u32, &term, heap);
|
||||
// We skip all tokens with a len greater than u16.
|
||||
if token.text.len() <= MAX_TOKEN_LEN {
|
||||
term.set_text(token.text.as_str());
|
||||
self.subscribe(term_index, doc_id, token.position as u32, &term, heap);
|
||||
} else {
|
||||
info!(
|
||||
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
|
||||
MAX_TOKEN_LEN in the documentation for more information.",
|
||||
token.text.len(),
|
||||
MAX_TOKEN_LEN
|
||||
);
|
||||
}
|
||||
};
|
||||
token_stream.process(&mut sink)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use common::{BinarySerializable, VInt};
|
||||
use docset::{DocSet, SkipResult};
|
||||
use owned_read::OwnedRead;
|
||||
use positions::PositionReader;
|
||||
use postings::compression::compressed_block_size;
|
||||
use postings::compression::{compressed_block_size, AlignedBuffer};
|
||||
use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
|
||||
use postings::serializer::PostingsSerializer;
|
||||
use postings::BlockSearcher;
|
||||
@@ -130,9 +130,11 @@ impl DocSet for SegmentPostings {
|
||||
// next needs to be called a first time to point to the correct element.
|
||||
#[inline]
|
||||
fn advance(&mut self) -> bool {
|
||||
if self.position_computer.is_some() {
|
||||
if self.position_computer.is_some() && self.cur < COMPRESSION_BLOCK_SIZE {
|
||||
let term_freq = self.term_freq() as usize;
|
||||
self.position_computer.as_mut().unwrap().add_skip(term_freq);
|
||||
if let Some(position_computer) = self.position_computer.as_mut() {
|
||||
position_computer.add_skip(term_freq);
|
||||
}
|
||||
}
|
||||
self.cur += 1;
|
||||
if self.cur >= self.block_cursor.block_len() {
|
||||
@@ -167,7 +169,6 @@ impl DocSet for SegmentPostings {
|
||||
|
||||
// skip blocks until one that might contain the target
|
||||
// check if we need to go to the next block
|
||||
let need_positions = self.position_computer.is_some();
|
||||
let mut sum_freqs_skipped: u32 = 0;
|
||||
if !self
|
||||
.block_cursor
|
||||
@@ -181,7 +182,7 @@ impl DocSet for SegmentPostings {
|
||||
// we are not in the right block.
|
||||
//
|
||||
// First compute all of the freqs skipped from the current block.
|
||||
if need_positions {
|
||||
if self.position_computer.is_some() {
|
||||
sum_freqs_skipped = self.block_cursor.freqs()[self.cur..].iter().sum();
|
||||
match self.block_cursor.skip_to(target) {
|
||||
BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => {
|
||||
@@ -200,24 +201,21 @@ impl DocSet for SegmentPostings {
|
||||
self.cur = 0;
|
||||
}
|
||||
|
||||
let cur = self.cur;
|
||||
|
||||
// we're in the right block now, start with an exponential search
|
||||
let block_docs = self.block_cursor.docs();
|
||||
let (output, len) = self.block_cursor.docs_aligned();
|
||||
let new_cur = self
|
||||
.block_searcher
|
||||
.search_in_block(&block_docs, self.cur, target);
|
||||
if need_positions {
|
||||
sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur]
|
||||
.iter()
|
||||
.sum::<u32>();
|
||||
self.position_computer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.add_skip(sum_freqs_skipped as usize);
|
||||
.search_in_block(&output, len, cur, target);
|
||||
if let Some(position_computer) = self.position_computer.as_mut() {
|
||||
sum_freqs_skipped += self.block_cursor.freqs()[cur..new_cur].iter().sum::<u32>();
|
||||
position_computer.add_skip(sum_freqs_skipped as usize);
|
||||
}
|
||||
self.cur = new_cur;
|
||||
|
||||
// `doc` is now the first element >= `target`
|
||||
let doc = block_docs[new_cur];
|
||||
let doc = output.0[new_cur];
|
||||
debug_assert!(doc >= target);
|
||||
if doc == target {
|
||||
SkipResult::Reached
|
||||
@@ -227,12 +225,16 @@ impl DocSet for SegmentPostings {
|
||||
}
|
||||
|
||||
/// Return the current document's `DocId`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Will panics if called without having called advance before.
|
||||
#[inline]
|
||||
fn doc(&self) -> DocId {
|
||||
let docs = self.block_cursor.docs();
|
||||
debug_assert!(
|
||||
self.cur < docs.len(),
|
||||
"Have you forgotten to call `.advance()` at least once before calling .doc()."
|
||||
"Have you forgotten to call `.advance()` at least once before calling `.doc()` ."
|
||||
);
|
||||
docs[self.cur]
|
||||
}
|
||||
@@ -264,17 +266,33 @@ impl HasLen for SegmentPostings {
|
||||
}
|
||||
|
||||
impl Postings for SegmentPostings {
|
||||
/// Returns the frequency associated to the current document.
|
||||
/// If the schema is set up so that no frequency have been encoded,
|
||||
/// this method should always return 1.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Will panics if called without having called advance before.
|
||||
fn term_freq(&self) -> u32 {
|
||||
debug_assert!(
|
||||
// Here we do not use the len of `freqs()`
|
||||
// because it is actually ok to request for the freq of doc
|
||||
// even if no frequency were encoded for the field.
|
||||
//
|
||||
// In that case we hit the block just as if the frequency had been
|
||||
// decoded. The block is simply prefilled by the value 1.
|
||||
self.cur < COMPRESSION_BLOCK_SIZE,
|
||||
"Have you forgotten to call `.advance()` at least once before calling \
|
||||
`.term_freq()`."
|
||||
);
|
||||
self.block_cursor.freq(self.cur)
|
||||
}
|
||||
|
||||
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
|
||||
if self.position_computer.is_some() {
|
||||
output.resize(self.term_freq() as usize, 0u32);
|
||||
self.position_computer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.positions_with_offset(offset, &mut output[..])
|
||||
let term_freq = self.term_freq() as usize;
|
||||
if let Some(position_comp) = self.position_computer.as_mut() {
|
||||
output.resize(term_freq, 0u32);
|
||||
position_comp.positions_with_offset(offset, &mut output[..]);
|
||||
} else {
|
||||
output.clear();
|
||||
}
|
||||
@@ -396,6 +414,10 @@ impl BlockSegmentPostings {
|
||||
self.doc_decoder.output_array()
|
||||
}
|
||||
|
||||
pub(crate) fn docs_aligned(&self) -> (&AlignedBuffer, usize) {
|
||||
self.doc_decoder.output_aligned()
|
||||
}
|
||||
|
||||
/// Return the document at index `idx` of the block.
|
||||
#[inline]
|
||||
pub fn doc(&self, idx: usize) -> u32 {
|
||||
@@ -592,6 +614,7 @@ mod tests {
|
||||
use common::HasLen;
|
||||
use core::Index;
|
||||
use docset::DocSet;
|
||||
use postings::postings::Postings;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::Schema;
|
||||
use schema::Term;
|
||||
@@ -608,6 +631,18 @@ mod tests {
|
||||
assert_eq!(postings.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
|
||||
fn test_panic_if_doc_called_before_advance() {
|
||||
SegmentPostings::empty().doc();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
|
||||
fn test_panic_if_freq_called_before_advance() {
|
||||
SegmentPostings::empty().term_freq();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_block_segment_postings() {
|
||||
let mut postings = BlockSegmentPostings::empty();
|
||||
|
||||
@@ -14,7 +14,7 @@ use termdict::{TermDictionaryBuilder, TermOrdinal};
|
||||
use DocId;
|
||||
use Result;
|
||||
|
||||
/// `PostingsSerializer` is in charge of serializing
|
||||
/// `InvertedIndexSerializer` is in charge of serializing
|
||||
/// postings on disk, in the
|
||||
/// * `.idx` (inverted index)
|
||||
/// * `.pos` (positions file)
|
||||
@@ -54,7 +54,7 @@ pub struct InvertedIndexSerializer {
|
||||
}
|
||||
|
||||
impl InvertedIndexSerializer {
|
||||
/// Open a new `PostingsSerializer` for the given segment
|
||||
/// Open a new `InvertedIndexSerializer` for the given segment
|
||||
fn create(
|
||||
terms_write: CompositeWrite<WritePtr>,
|
||||
postings_write: CompositeWrite<WritePtr>,
|
||||
|
||||
@@ -98,4 +98,20 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_query_count_when_there_are_deletes() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 5_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a b"));
|
||||
index_writer.add_document(doc!(text_field=>"a c"));
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "b"));
|
||||
index_writer.commit().unwrap();
|
||||
let term_a = Term::from_field_text(text_field, "a");
|
||||
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
|
||||
let reader = index.reader().unwrap();
|
||||
assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,15 +39,15 @@ impl Weight for TermWeight {
|
||||
}
|
||||
|
||||
fn count(&self, reader: &SegmentReader) -> Result<u32> {
|
||||
if reader.num_deleted_docs() == 0 {
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
Ok(self.scorer(reader)?.count(delete_bitset))
|
||||
} else {
|
||||
let field = self.term.field();
|
||||
Ok(reader
|
||||
.inverted_index(field)
|
||||
.get_term_info(&self.term)
|
||||
.map(|term_info| term_info.doc_freq)
|
||||
.unwrap_or(0))
|
||||
} else {
|
||||
Ok(self.scorer(reader)?.count())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn count(&mut self) -> u32 {
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
let mut count = self.bitsets[self.cursor..HORIZON_NUM_TINYBITSETS]
|
||||
.iter()
|
||||
.map(|bitset| bitset.len())
|
||||
@@ -163,6 +163,8 @@ where
|
||||
count
|
||||
}
|
||||
|
||||
// TODO implement `count` efficiently.
|
||||
|
||||
fn skip_next(&mut self, target: DocId) -> SkipResult {
|
||||
if !self.advance() {
|
||||
return SkipResult::End;
|
||||
@@ -300,7 +302,7 @@ mod tests {
|
||||
count += 1;
|
||||
}
|
||||
assert!(!union_expected.advance());
|
||||
assert_eq!(count, make_union().count());
|
||||
assert_eq!(count, make_union().count_including_deleted());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -13,6 +13,11 @@ pub trait Weight: Send + Sync + 'static {
|
||||
|
||||
/// Returns the number documents within the given `SegmentReader`.
|
||||
fn count(&self, reader: &SegmentReader) -> Result<u32> {
|
||||
Ok(self.scorer(reader)?.count())
|
||||
let mut scorer = self.scorer(reader)?;
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
Ok(scorer.count(delete_bitset))
|
||||
} else {
|
||||
Ok(scorer.count_including_deleted())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ use Index;
|
||||
use Result;
|
||||
use Searcher;
|
||||
use SegmentReader;
|
||||
use schema::Schema;
|
||||
|
||||
/// Defines when a new version of the index should be reloaded.
|
||||
///
|
||||
@@ -159,11 +158,6 @@ pub struct IndexReader {
|
||||
}
|
||||
|
||||
impl IndexReader {
|
||||
|
||||
pub fn schema(&self) -> Schema {
|
||||
self.inner.index.schema()
|
||||
}
|
||||
|
||||
/// Update searchers so that they reflect the state of the last
|
||||
/// `.commit()`.
|
||||
///
|
||||
|
||||
@@ -97,6 +97,8 @@
|
||||
//! If you built your schema programmatically, a complete example
|
||||
//! could like this for instance.
|
||||
//!
|
||||
//! Note that tokens with a len greater or equal to [`MAX_TOKEN_LEN`](./constant.MAX_TOKEN_LEN.html).
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```
|
||||
@@ -157,6 +159,13 @@ pub use self::tokenizer::BoxedTokenizer;
|
||||
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
|
||||
pub use self::tokenizer_manager::TokenizerManager;
|
||||
|
||||
/// Maximum authorized len (in bytes) for a token.
|
||||
///
|
||||
/// Tokenizer are in charge of not emitting tokens larger than this value.
|
||||
/// Currently, if a faulty tokenizer implementation emits tokens with a length larger than
|
||||
/// `2^16 - 1 - 4`, the token will simply be ignored downstream.
|
||||
pub const MAX_TOKEN_LEN: usize = u16::max_value() as usize - 4;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::{
|
||||
@@ -228,27 +237,27 @@ pub mod tests {
|
||||
fn test_non_en_tokenizer() {
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
tokenizer_manager.register(
|
||||
"es_stem",
|
||||
"el_stem",
|
||||
SimpleTokenizer
|
||||
.filter(RemoveLongFilter::limit(40))
|
||||
.filter(LowerCaser)
|
||||
.filter(Stemmer::new(Language::Spanish)),
|
||||
.filter(Stemmer::new(Language::Greek)),
|
||||
);
|
||||
let en_tokenizer = tokenizer_manager.get("es_stem").unwrap();
|
||||
let en_tokenizer = tokenizer_manager.get("el_stem").unwrap();
|
||||
let mut tokens: Vec<Token> = vec![];
|
||||
{
|
||||
let mut add_token = |token: &Token| {
|
||||
tokens.push(token.clone());
|
||||
};
|
||||
en_tokenizer
|
||||
.token_stream("Hola, feliz contribuyente!")
|
||||
.token_stream("Καλημέρα, χαρούμενε φορολογούμενε!")
|
||||
.process(&mut add_token);
|
||||
}
|
||||
|
||||
assert_eq!(tokens.len(), 3);
|
||||
assert_token(&tokens[0], 0, "hola", 0, 4);
|
||||
assert_token(&tokens[1], 1, "feliz", 6, 11);
|
||||
assert_token(&tokens[2], 2, "contribuyent", 12, 25);
|
||||
assert_token(&tokens[0], 0, "καλημερ", 0, 16);
|
||||
assert_token(&tokens[1], 1, "χαρουμεν", 18, 36);
|
||||
assert_token(&tokens[2], 2, "φορολογουμεν", 37, 63);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
use super::{Token, TokenFilter, TokenStream};
|
||||
use rust_stemmers::{self, Algorithm};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Available stemmer languages.
|
||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone)]
|
||||
@@ -57,14 +56,14 @@ impl Language {
|
||||
/// Tokens are expected to be lowercased beforehand.
|
||||
#[derive(Clone)]
|
||||
pub struct Stemmer {
|
||||
stemmer_algorithm: Arc<Algorithm>,
|
||||
stemmer_algorithm: Algorithm,
|
||||
}
|
||||
|
||||
impl Stemmer {
|
||||
/// Creates a new Stemmer `TokenFilter` for a given language algorithm.
|
||||
pub fn new(language: Language) -> Stemmer {
|
||||
Stemmer {
|
||||
stemmer_algorithm: Arc::new(language.algorithm()),
|
||||
stemmer_algorithm: language.algorithm(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -83,7 +82,7 @@ where
|
||||
type ResultTokenStream = StemmerTokenStream<TailTokenStream>;
|
||||
|
||||
fn transform(&self, token_stream: TailTokenStream) -> Self::ResultTokenStream {
|
||||
let inner_stemmer = rust_stemmers::Stemmer::create(Algorithm::English);
|
||||
let inner_stemmer = rust_stemmers::Stemmer::create(self.stemmer_algorithm);
|
||||
StemmerTokenStream::wrap(inner_stemmer, token_stream)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user