mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 17:42:55 +00:00
Compare commits
22 Commits
0.9
...
softcommit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c846b1202 | ||
|
|
fac0013454 | ||
|
|
95db5d9999 | ||
|
|
e3abb4481b | ||
|
|
bfa61d2f2f | ||
|
|
6c0e621fdb | ||
|
|
a8cc5208f1 | ||
|
|
83eb0d0cb7 | ||
|
|
ee6e273365 | ||
|
|
7f0372fa97 | ||
|
|
f8fdf68fcb | ||
|
|
c00e95cd04 | ||
|
|
a623d8f6d9 | ||
|
|
b3ede2dd7e | ||
|
|
b68686f040 | ||
|
|
629d3fb37f | ||
|
|
f513f10e05 | ||
|
|
f262d4cc22 | ||
|
|
91e89714f4 | ||
|
|
6fd3cb1254 | ||
|
|
549b4e66e5 | ||
|
|
d9b2bf98e2 |
@@ -29,7 +29,7 @@ addons:
|
|||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
# Android
|
# Android
|
||||||
- env: TARGET=aarch64-linux-android
|
- env: TARGET=aarch64-linux-android DISABLE_TESTS
|
||||||
#- env: TARGET=arm-linux-androideabi DISABLE_TESTS=1
|
#- env: TARGET=arm-linux-androideabi DISABLE_TESTS=1
|
||||||
#- env: TARGET=armv7-linux-androideabi DISABLE_TESTS=1
|
#- env: TARGET=armv7-linux-androideabi DISABLE_TESTS=1
|
||||||
#- env: TARGET=i686-linux-android DISABLE_TESTS=1
|
#- env: TARGET=i686-linux-android DISABLE_TESTS=1
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ previous index format.*
|
|||||||
for int fields. (@fulmicoton)
|
for int fields. (@fulmicoton)
|
||||||
- Added DateTime field (@barrotsteindev)
|
- Added DateTime field (@barrotsteindev)
|
||||||
- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton)
|
- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton)
|
||||||
|
- SIMD linear search within blocks (@fulmicoton)
|
||||||
|
|
||||||
Tantivy 0.8.2
|
Tantivy 0.8.2
|
||||||
=====================
|
=====================
|
||||||
|
|||||||
21
README.md
21
README.md
@@ -17,6 +17,7 @@
|
|||||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/6)
|
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/6)
|
||||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/7)
|
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/7)
|
||||||
|
|
||||||
|
[](https://www.patreon.com/fulmicoton)
|
||||||
|
|
||||||
|
|
||||||
**Tantivy** is a **full text search engine library** written in rust.
|
**Tantivy** is a **full text search engine library** written in rust.
|
||||||
@@ -27,6 +28,14 @@ to build such a search engine.
|
|||||||
|
|
||||||
Tantivy is, in fact, strongly inspired by Lucene's design.
|
Tantivy is, in fact, strongly inspired by Lucene's design.
|
||||||
|
|
||||||
|
# Benchmark
|
||||||
|
|
||||||
|
Tantivy is typically faster than Lucene, but the results will depend on
|
||||||
|
the nature of the queries in your workload.
|
||||||
|
|
||||||
|
The following [benchmark](https://tantivy-search.github.io/bench/) break downs
|
||||||
|
performance for different type of queries / collection.
|
||||||
|
|
||||||
# Features
|
# Features
|
||||||
|
|
||||||
- Full-text search
|
- Full-text search
|
||||||
@@ -87,6 +96,14 @@ To check out and run tests, you can simply run :
|
|||||||
Some tests will not run with just `cargo test` because of `fail-rs`.
|
Some tests will not run with just `cargo test` because of `fail-rs`.
|
||||||
To run the tests exhaustively, run `./run-tests.sh`.
|
To run the tests exhaustively, run `./run-tests.sh`.
|
||||||
|
|
||||||
# Contribute
|
# How can I support this project ?
|
||||||
|
|
||||||
Send me an email (paul.masurel at gmail.com) if you want to contribute to tantivy.
|
There are many ways to support this project.
|
||||||
|
|
||||||
|
- If you use tantivy, tell us about your experience on [gitter](https://gitter.im/tantivy-search/tantivy) or by email (paul.masurel@gmail.com)
|
||||||
|
- Report bugs
|
||||||
|
- Write a blog post
|
||||||
|
- Complete documentation
|
||||||
|
- Contribute code (you can join [our gitter](https://gitter.im/tantivy-search/tantivy) )
|
||||||
|
- Talk about tantivy around you
|
||||||
|
- Drop a word on on [](https://saythanks.io/to/fulmicoton) or even [](https://www.patreon.com/fulmicoton)
|
||||||
|
|||||||
@@ -13,7 +13,11 @@ pub use self::serialize::{BinarySerializable, FixedSize};
|
|||||||
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
|
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
|
||||||
pub use byteorder::LittleEndian as Endianness;
|
pub use byteorder::LittleEndian as Endianness;
|
||||||
|
|
||||||
use std::io;
|
|
||||||
|
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
|
||||||
|
///
|
||||||
|
/// We do not allow segments with more than
|
||||||
|
pub const MAX_DOC_LIMIT: u32 = 1 << 31;
|
||||||
|
|
||||||
/// Computes the number of bits that will be used for bitpacking.
|
/// Computes the number of bits that will be used for bitpacking.
|
||||||
///
|
///
|
||||||
@@ -52,11 +56,6 @@ pub(crate) fn is_power_of_2(n: usize) -> bool {
|
|||||||
(n > 0) && (n & (n - 1) == 0)
|
(n > 0) && (n & (n - 1) == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a default io error given a string.
|
|
||||||
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
|
||||||
io::Error::new(io::ErrorKind::Other, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Has length trait
|
/// Has length trait
|
||||||
pub trait HasLen {
|
pub trait HasLen {
|
||||||
/// Return length
|
/// Return length
|
||||||
@@ -134,4 +133,11 @@ pub(crate) mod test {
|
|||||||
assert_eq!(compute_num_bits(256), 9u8);
|
assert_eq!(compute_num_bits(256), 9u8);
|
||||||
assert_eq!(compute_num_bits(5_000_000_000), 33u8);
|
assert_eq!(compute_num_bits(5_000_000_000), 33u8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_max_doc() {
|
||||||
|
// this is the first time I write a unit test for a constant.
|
||||||
|
assert!(((super::MAX_DOC_LIMIT - 1) as i32) >= 0);
|
||||||
|
assert!((super::MAX_DOC_LIMIT as i32) < 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ use schema::Schema;
|
|||||||
use serde_json;
|
use serde_json;
|
||||||
use std::borrow::BorrowMut;
|
use std::borrow::BorrowMut;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
#[cfg(feature = "mmap")]
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokenizer::BoxedTokenizer;
|
use tokenizer::BoxedTokenizer;
|
||||||
@@ -355,10 +356,8 @@ mod tests {
|
|||||||
use directory::RAMDirectory;
|
use directory::RAMDirectory;
|
||||||
use schema::Field;
|
use schema::Field;
|
||||||
use schema::{Schema, INDEXED, TEXT};
|
use schema::{Schema, INDEXED, TEXT};
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tempdir::TempDir;
|
|
||||||
use Index;
|
use Index;
|
||||||
use IndexReader;
|
use IndexReader;
|
||||||
use IndexWriter;
|
use IndexWriter;
|
||||||
@@ -444,61 +443,69 @@ mod tests {
|
|||||||
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[cfg(feature = "mmap")]
|
||||||
fn test_index_on_commit_reload_policy_mmap() {
|
mod mmap_specific {
|
||||||
let schema = throw_away_schema();
|
|
||||||
let field = schema.get_field("num_likes").unwrap();
|
|
||||||
let tempdir = TempDir::new("index").unwrap();
|
|
||||||
let tempdir_path = PathBuf::from(tempdir.path());
|
|
||||||
let index = Index::create_in_dir(&tempdir_path, schema).unwrap();
|
|
||||||
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
|
||||||
writer.commit().unwrap();
|
|
||||||
let reader = index
|
|
||||||
.reader_builder()
|
|
||||||
.reload_policy(ReloadPolicy::OnCommit)
|
|
||||||
.try_into()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(reader.searcher().num_docs(), 0);
|
|
||||||
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
use super::*;
|
||||||
fn test_index_manual_policy_mmap() {
|
use std::path::PathBuf;
|
||||||
let schema = throw_away_schema();
|
use tempdir::TempDir;
|
||||||
let field = schema.get_field("num_likes").unwrap();
|
|
||||||
let index = Index::create_from_tempdir(schema).unwrap();
|
|
||||||
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
|
||||||
writer.commit().unwrap();
|
|
||||||
let reader = index
|
|
||||||
.reader_builder()
|
|
||||||
.reload_policy(ReloadPolicy::Manual)
|
|
||||||
.try_into()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(reader.searcher().num_docs(), 0);
|
|
||||||
writer.add_document(doc!(field=>1u64));
|
|
||||||
writer.commit().unwrap();
|
|
||||||
thread::sleep(Duration::from_millis(500));
|
|
||||||
assert_eq!(reader.searcher().num_docs(), 0);
|
|
||||||
reader.reload().unwrap();
|
|
||||||
assert_eq!(reader.searcher().num_docs(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_index_on_commit_reload_policy_different_directories() {
|
fn test_index_on_commit_reload_policy_mmap() {
|
||||||
let schema = throw_away_schema();
|
let schema = throw_away_schema();
|
||||||
let field = schema.get_field("num_likes").unwrap();
|
let field = schema.get_field("num_likes").unwrap();
|
||||||
let tempdir = TempDir::new("index").unwrap();
|
let tempdir = TempDir::new("index").unwrap();
|
||||||
let tempdir_path = PathBuf::from(tempdir.path());
|
let tempdir_path = PathBuf::from(tempdir.path());
|
||||||
let write_index = Index::create_in_dir(&tempdir_path, schema).unwrap();
|
let index = Index::create_in_dir(&tempdir_path, schema).unwrap();
|
||||||
let read_index = Index::open_in_dir(&tempdir_path).unwrap();
|
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||||
let reader = read_index
|
writer.commit().unwrap();
|
||||||
.reader_builder()
|
let reader = index
|
||||||
.reload_policy(ReloadPolicy::OnCommit)
|
.reader_builder()
|
||||||
.try_into()
|
.reload_policy(ReloadPolicy::OnCommit)
|
||||||
.unwrap();
|
.try_into()
|
||||||
assert_eq!(reader.searcher().num_docs(), 0);
|
.unwrap();
|
||||||
let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap();
|
assert_eq!(reader.searcher().num_docs(), 0);
|
||||||
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_index_manual_policy_mmap() {
|
||||||
|
let schema = throw_away_schema();
|
||||||
|
let field = schema.get_field("num_likes").unwrap();
|
||||||
|
let index = Index::create_from_tempdir(schema).unwrap();
|
||||||
|
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||||
|
writer.commit().unwrap();
|
||||||
|
let reader = index
|
||||||
|
.reader_builder()
|
||||||
|
.reload_policy(ReloadPolicy::Manual)
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(reader.searcher().num_docs(), 0);
|
||||||
|
writer.add_document(doc!(field=>1u64));
|
||||||
|
writer.commit().unwrap();
|
||||||
|
thread::sleep(Duration::from_millis(500));
|
||||||
|
assert_eq!(reader.searcher().num_docs(), 0);
|
||||||
|
reader.reload().unwrap();
|
||||||
|
assert_eq!(reader.searcher().num_docs(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_index_on_commit_reload_policy_different_directories() {
|
||||||
|
let schema = throw_away_schema();
|
||||||
|
let field = schema.get_field("num_likes").unwrap();
|
||||||
|
let tempdir = TempDir::new("index").unwrap();
|
||||||
|
let tempdir_path = PathBuf::from(tempdir.path());
|
||||||
|
let write_index = Index::create_in_dir(&tempdir_path, schema).unwrap();
|
||||||
|
let read_index = Index::open_in_dir(&tempdir_path).unwrap();
|
||||||
|
let reader = read_index
|
||||||
|
.reader_builder()
|
||||||
|
.reload_policy(ReloadPolicy::OnCommit)
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(reader.searcher().num_docs(), 0);
|
||||||
|
let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||||
|
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_index_on_commit_reload_policy_aux(
|
fn test_index_on_commit_reload_policy_aux(
|
||||||
|
|||||||
@@ -205,6 +205,16 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
|||||||
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
|
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
|
||||||
/// `OnCommit` `ReloadPolicy` to work properly.
|
/// `OnCommit` `ReloadPolicy` to work properly.
|
||||||
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
|
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
|
||||||
|
|
||||||
|
/// Ensure that all volatile files reach are persisted (in directory where that makes sense.)
|
||||||
|
///
|
||||||
|
/// In order to make Near Real Time efficient, tantivy introduced the notion of soft_commit vs
|
||||||
|
/// commit. Commit will call `.flush()`, while softcommit won't.
|
||||||
|
///
|
||||||
|
/// `meta.json` should be the last file to be flushed.
|
||||||
|
fn flush(&self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// DirectoryClone
|
/// DirectoryClone
|
||||||
|
|||||||
@@ -260,95 +260,98 @@ impl Clone for ManagedDirectory {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use super::*;
|
|
||||||
#[cfg(feature = "mmap")]
|
#[cfg(feature = "mmap")]
|
||||||
use directory::MmapDirectory;
|
mod mmap_specific {
|
||||||
use std::io::Write;
|
|
||||||
use std::path::Path;
|
|
||||||
use tempdir::TempDir;
|
|
||||||
|
|
||||||
lazy_static! {
|
use super::super::*;
|
||||||
static ref TEST_PATH1: &'static Path = Path::new("some_path_for_test");
|
use std::path::Path;
|
||||||
static ref TEST_PATH2: &'static Path = Path::new("some_path_for_test2");
|
use tempdir::TempDir;
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
lazy_static! {
|
||||||
#[cfg(feature = "mmap")]
|
static ref TEST_PATH1: &'static Path = Path::new("some_path_for_test");
|
||||||
fn test_managed_directory() {
|
static ref TEST_PATH2: &'static Path = Path::new("some_path_for_test2");
|
||||||
let tempdir = TempDir::new("index").unwrap();
|
}
|
||||||
let tempdir_path = PathBuf::from(tempdir.path());
|
|
||||||
{
|
use directory::MmapDirectory;
|
||||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
use std::io::Write;
|
||||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
|
||||||
|
#[test]
|
||||||
|
fn test_managed_directory() {
|
||||||
|
let tempdir = TempDir::new("index").unwrap();
|
||||||
|
let tempdir_path = PathBuf::from(tempdir.path());
|
||||||
{
|
{
|
||||||
let mut write_file = managed_directory.open_write(*TEST_PATH1).unwrap();
|
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||||
write_file.flush().unwrap();
|
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||||
|
{
|
||||||
|
let mut write_file = managed_directory.open_write(*TEST_PATH1).unwrap();
|
||||||
|
write_file.flush().unwrap();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
managed_directory
|
||||||
|
.atomic_write(*TEST_PATH2, &vec![0u8, 1u8])
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
assert!(managed_directory.exists(*TEST_PATH1));
|
||||||
|
assert!(managed_directory.exists(*TEST_PATH2));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let living_files: HashSet<PathBuf> =
|
||||||
|
[TEST_PATH1.to_owned()].into_iter().cloned().collect();
|
||||||
|
managed_directory.garbage_collect(|| living_files);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
assert!(managed_directory.exists(*TEST_PATH1));
|
||||||
|
assert!(!managed_directory.exists(*TEST_PATH2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
managed_directory
|
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||||
.atomic_write(*TEST_PATH2, &vec![0u8, 1u8])
|
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||||
.unwrap();
|
{
|
||||||
}
|
assert!(managed_directory.exists(*TEST_PATH1));
|
||||||
{
|
assert!(!managed_directory.exists(*TEST_PATH2));
|
||||||
assert!(managed_directory.exists(*TEST_PATH1));
|
}
|
||||||
assert!(managed_directory.exists(*TEST_PATH2));
|
{
|
||||||
}
|
let living_files: HashSet<PathBuf> = HashSet::new();
|
||||||
{
|
managed_directory.garbage_collect(|| living_files);
|
||||||
let living_files: HashSet<PathBuf> =
|
}
|
||||||
[TEST_PATH1.to_owned()].into_iter().cloned().collect();
|
{
|
||||||
managed_directory.garbage_collect(|| living_files);
|
assert!(!managed_directory.exists(*TEST_PATH1));
|
||||||
}
|
assert!(!managed_directory.exists(*TEST_PATH2));
|
||||||
{
|
}
|
||||||
assert!(managed_directory.exists(*TEST_PATH1));
|
|
||||||
assert!(!managed_directory.exists(*TEST_PATH2));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
|
||||||
|
#[test]
|
||||||
|
fn test_managed_directory_gc_while_mmapped() {
|
||||||
|
let tempdir = TempDir::new("index").unwrap();
|
||||||
|
let tempdir_path = PathBuf::from(tempdir.path());
|
||||||
|
let living_files = HashSet::new();
|
||||||
|
|
||||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||||
{
|
managed_directory
|
||||||
assert!(managed_directory.exists(*TEST_PATH1));
|
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
|
||||||
assert!(!managed_directory.exists(*TEST_PATH2));
|
.unwrap();
|
||||||
}
|
|
||||||
{
|
|
||||||
let living_files: HashSet<PathBuf> = HashSet::new();
|
|
||||||
managed_directory.garbage_collect(|| living_files);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
assert!(!managed_directory.exists(*TEST_PATH1));
|
|
||||||
assert!(!managed_directory.exists(*TEST_PATH2));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[cfg(feature = "mmap ")]
|
|
||||||
fn test_managed_directory_gc_while_mmapped() {
|
|
||||||
let tempdir = TempDir::new("index").unwrap();
|
|
||||||
let tempdir_path = PathBuf::from(tempdir.path());
|
|
||||||
let living_files = HashSet::new();
|
|
||||||
|
|
||||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
|
||||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
|
||||||
managed_directory
|
|
||||||
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
|
|
||||||
.unwrap();
|
|
||||||
assert!(managed_directory.exists(*TEST_PATH1));
|
|
||||||
|
|
||||||
let _mmap_read = managed_directory.open_read(*TEST_PATH1).unwrap();
|
|
||||||
managed_directory.garbage_collect(|| living_files.clone());
|
|
||||||
if cfg!(target_os = "windows") {
|
|
||||||
// On Windows, gc should try and fail the file as it is mmapped.
|
|
||||||
assert!(managed_directory.exists(*TEST_PATH1));
|
assert!(managed_directory.exists(*TEST_PATH1));
|
||||||
// unmap should happen here.
|
|
||||||
drop(_mmap_read);
|
let _mmap_read = managed_directory.open_read(*TEST_PATH1).unwrap();
|
||||||
// The file should still be in the list of managed file and
|
managed_directory.garbage_collect(|| living_files.clone());
|
||||||
// eventually be deleted once mmap is released.
|
if cfg!(target_os = "windows") {
|
||||||
managed_directory.garbage_collect(|| living_files);
|
// On Windows, gc should try and fail the file as it is mmapped.
|
||||||
assert!(!managed_directory.exists(*TEST_PATH1));
|
assert!(managed_directory.exists(*TEST_PATH1));
|
||||||
} else {
|
// unmap should happen here.
|
||||||
assert!(!managed_directory.exists(*TEST_PATH1));
|
drop(_mmap_read);
|
||||||
|
// The file should still be in the list of managed file and
|
||||||
|
// eventually be deleted once mmap is released.
|
||||||
|
managed_directory.garbage_collect(|| living_files);
|
||||||
|
assert!(!managed_directory.exists(*TEST_PATH1));
|
||||||
|
} else {
|
||||||
|
assert!(!managed_directory.exists(*TEST_PATH1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ use self::notify::RawEvent;
|
|||||||
use self::notify::RecursiveMode;
|
use self::notify::RecursiveMode;
|
||||||
use self::notify::Watcher;
|
use self::notify::Watcher;
|
||||||
use atomicwrites;
|
use atomicwrites;
|
||||||
use common::make_io_err;
|
|
||||||
use core::META_FILEPATH;
|
use core::META_FILEPATH;
|
||||||
use directory::error::LockError;
|
use directory::error::LockError;
|
||||||
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||||
@@ -37,6 +36,11 @@ use std::sync::Weak;
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
|
/// Create a default io error given a string.
|
||||||
|
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
||||||
|
io::Error::new(io::ErrorKind::Other, msg)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns None iff the file exists, can be read, but is empty (and hence
|
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||||
/// cannot be mmapped)
|
/// cannot be mmapped)
|
||||||
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
||||||
@@ -364,7 +368,7 @@ impl Drop for ReleaseLockFile {
|
|||||||
|
|
||||||
/// This Write wraps a File, but has the specificity of
|
/// This Write wraps a File, but has the specificity of
|
||||||
/// call `sync_all` on flush.
|
/// call `sync_all` on flush.
|
||||||
struct SafeFileWriter(File);
|
pub struct SafeFileWriter(File);
|
||||||
|
|
||||||
impl SafeFileWriter {
|
impl SafeFileWriter {
|
||||||
fn new(file: File) -> SafeFileWriter {
|
fn new(file: File) -> SafeFileWriter {
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ mod managed_directory;
|
|||||||
mod ram_directory;
|
mod ram_directory;
|
||||||
mod read_only_source;
|
mod read_only_source;
|
||||||
mod watch_event_router;
|
mod watch_event_router;
|
||||||
|
mod nrt_directory;
|
||||||
|
|
||||||
/// Errors specific to the directory module.
|
/// Errors specific to the directory module.
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|||||||
195
src/directory/nrt_directory.rs
Normal file
195
src/directory/nrt_directory.rs
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
use directory::Directory;
|
||||||
|
use std::path::{PathBuf, Path};
|
||||||
|
use directory::ReadOnlySource;
|
||||||
|
use directory::error::OpenReadError;
|
||||||
|
use directory::error::DeleteError;
|
||||||
|
use std::io::{BufWriter, Cursor};
|
||||||
|
use directory::SeekableWrite;
|
||||||
|
use directory::error::OpenWriteError;
|
||||||
|
use directory::WatchHandle;
|
||||||
|
use directory::ram_directory::InnerRamDirectory;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use directory::WatchCallback;
|
||||||
|
use std::fmt;
|
||||||
|
use std::io;
|
||||||
|
use std::io::{Seek, Write};
|
||||||
|
use directory::DirectoryClone;
|
||||||
|
|
||||||
|
|
||||||
|
const BUFFER_LEN: usize = 1_000_000;
|
||||||
|
|
||||||
|
|
||||||
|
pub enum NRTWriter {
|
||||||
|
InRam {
|
||||||
|
buffer: Cursor<Vec<u8>>,
|
||||||
|
path: PathBuf,
|
||||||
|
nrt_directory: NRTDirectory
|
||||||
|
},
|
||||||
|
UnderlyingFile(BufWriter<Box<SeekableWrite>>)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NRTWriter {
|
||||||
|
pub fn new(path: PathBuf, nrt_directory: NRTDirectory) -> NRTWriter {
|
||||||
|
NRTWriter::InRam {
|
||||||
|
buffer: Cursor::new(Vec::with_capacity(BUFFER_LEN)),
|
||||||
|
path,
|
||||||
|
nrt_directory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Seek for NRTWriter {
|
||||||
|
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
||||||
|
match self {
|
||||||
|
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||||
|
buffer.seek(pos)
|
||||||
|
}
|
||||||
|
NRTWriter::UnderlyingFile(file) => {
|
||||||
|
file.seek(pos)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for NRTWriter {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.write_all(buf)?;
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
match self {
|
||||||
|
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||||
|
let mut cache_wlock = nrt_directory.cache.write().unwrap();
|
||||||
|
cache_wlock.write(path.clone(), buffer.get_ref());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
NRTWriter::UnderlyingFile(file) => {
|
||||||
|
file.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
|
||||||
|
// Working around the borrow checker.
|
||||||
|
let mut underlying_write_opt: Option<BufWriter<Box<SeekableWrite>>> = None;
|
||||||
|
if let NRTWriter::InRam { buffer, path, nrt_directory } = self {
|
||||||
|
if buffer.get_ref().len() + buf.len() > BUFFER_LEN {
|
||||||
|
// We can't keep this in RAM. Let's move it to the underlying directory.
|
||||||
|
underlying_write_opt = Some(nrt_directory.open_write(path)
|
||||||
|
.map_err(|open_err| {
|
||||||
|
io::Error::new(io::ErrorKind::Other, open_err)
|
||||||
|
})?);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(underlying_write) = underlying_write_opt {
|
||||||
|
*self = NRTWriter::UnderlyingFile(underlying_write);
|
||||||
|
}
|
||||||
|
match self {
|
||||||
|
NRTWriter::InRam { buffer, path, nrt_directory } => {
|
||||||
|
assert!(buffer.get_ref().len() + buf.len() <= BUFFER_LEN);
|
||||||
|
buffer.write_all(buf)
|
||||||
|
}
|
||||||
|
NRTWriter::UnderlyingFile(file) => {
|
||||||
|
file.write_all(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NRTDirectory {
|
||||||
|
underlying: Box<Directory>,
|
||||||
|
cache: Arc<RwLock<InnerRamDirectory>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Clone for NRTDirectory {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
NRTDirectory {
|
||||||
|
underlying: self.underlying.box_clone(),
|
||||||
|
cache: self.cache.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NRTDirectory {
|
||||||
|
fn wrap(underlying: Box<Directory>) -> NRTDirectory {
|
||||||
|
NRTDirectory {
|
||||||
|
underlying,
|
||||||
|
cache: Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for NRTDirectory {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "NRTDirectory({:?})", self.underlying)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Directory for NRTDirectory {
|
||||||
|
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
|
||||||
|
// We explicitly release the lock, to prevent a panic on the underlying directory
|
||||||
|
// to poison the lock.
|
||||||
|
//
|
||||||
|
// File can only go from cache to underlying so the result does not lead to
|
||||||
|
// any inconsistency.
|
||||||
|
{
|
||||||
|
let mut cache_wlock = self.cache.write().unwrap();
|
||||||
|
if cache_wlock.exists(path) {
|
||||||
|
return cache_wlock.delete(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.underlying.delete(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exists(&self, path: &Path) -> bool {
|
||||||
|
// We explicitly release the lock, to prevent a panic on the underlying directory
|
||||||
|
// to poison the lock.
|
||||||
|
//
|
||||||
|
// File can only go from cache to underlying so the result does not lead to
|
||||||
|
// any inconsistency.
|
||||||
|
{
|
||||||
|
let rlock_cache = self.cache.read().unwrap();
|
||||||
|
if rlock_cache.exists(path) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.underlying.exists(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_write(&mut self, path: &Path) -> Result<BufWriter<Box<SeekableWrite>>, OpenWriteError> {
|
||||||
|
let mut cache_wlock = self.cache.write().unwrap();
|
||||||
|
// TODO might poison our lock. I don't know have a sound solution yet.
|
||||||
|
let path_buf = path.to_owned();
|
||||||
|
if self.underlying.exists(path) {
|
||||||
|
return Err(OpenWriteError::FileAlreadyExists(path_buf));
|
||||||
|
}
|
||||||
|
let exists = cache_wlock.write(path_buf.clone(), &[]);
|
||||||
|
// force the creation of the file to mimic the MMap directory.
|
||||||
|
if exists {
|
||||||
|
Err(OpenWriteError::FileAlreadyExists(path_buf))
|
||||||
|
} else {
|
||||||
|
let vec_writer = NRTWriter::new(path_buf.clone(), self.clone());
|
||||||
|
Ok(BufWriter::new(Box::new(vec_writer)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||||
|
self.underlying.atomic_read(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||||
|
self.underlying.atomic_write(path, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
|
||||||
|
self.underlying.watch(watch_callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -71,36 +71,36 @@ impl Write for VecWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct InnerDirectory {
|
pub(crate) struct InnerRamDirectory {
|
||||||
fs: HashMap<PathBuf, ReadOnlySource>,
|
fs: HashMap<PathBuf, ReadOnlySource>,
|
||||||
watch_router: WatchCallbackList,
|
watch_router: WatchCallbackList,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InnerDirectory {
|
impl InnerRamDirectory {
|
||||||
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
pub fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
||||||
let data = ReadOnlySource::new(Vec::from(data));
|
let data = ReadOnlySource::new(Vec::from(data));
|
||||||
self.fs.insert(path, data).is_some()
|
self.fs.insert(path, data).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
pub fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||||
self.fs
|
self.fs
|
||||||
.get(path)
|
.get(path)
|
||||||
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
|
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
|
||||||
.map(|el| el.clone())
|
.map(|el| el.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
|
pub fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
|
||||||
match self.fs.remove(path) {
|
match self.fs.remove(path) {
|
||||||
Some(_) => Ok(()),
|
Some(_) => Ok(()),
|
||||||
None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
|
None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exists(&self, path: &Path) -> bool {
|
pub fn exists(&self, path: &Path) -> bool {
|
||||||
self.fs.contains_key(path)
|
self.fs.contains_key(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
|
pub fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
|
||||||
self.watch_router.subscribe(watch_handle)
|
self.watch_router.subscribe(watch_handle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,7 +118,7 @@ impl fmt::Debug for RAMDirectory {
|
|||||||
///
|
///
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct RAMDirectory {
|
pub struct RAMDirectory {
|
||||||
fs: Arc<RwLock<InnerDirectory>>,
|
fs: Arc<RwLock<InnerRamDirectory>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RAMDirectory {
|
impl RAMDirectory {
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
#[cfg(feature = "mmap")]
|
|
||||||
fn test_indexing() {
|
fn test_indexing() {
|
||||||
let mut schema_builder = Schema::builder();
|
let mut schema_builder = Schema::builder();
|
||||||
|
|
||||||
|
|||||||
@@ -179,6 +179,11 @@ pub struct DeleteCursor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DeleteCursor {
|
impl DeleteCursor {
|
||||||
|
|
||||||
|
pub fn empty() -> DeleteCursor {
|
||||||
|
DeleteQueue::new().cursor()
|
||||||
|
}
|
||||||
|
|
||||||
/// Skips operations and position it so that
|
/// Skips operations and position it so that
|
||||||
/// - either all of the delete operation currently in the
|
/// - either all of the delete operation currently in the
|
||||||
/// queue are consume and the next get will return None.
|
/// queue are consume and the next get will return None.
|
||||||
|
|||||||
@@ -259,7 +259,7 @@ pub fn advance_deletes(
|
|||||||
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
segment_entry.set_meta(segment.meta().clone());
|
segment_entry.set_meta(target_opstamp, segment.meta().clone());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -326,7 +326,12 @@ fn index_documents(
|
|||||||
// to even open the segment.
|
// to even open the segment.
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
|
let segment_entry = SegmentEntry::new(
|
||||||
|
segment_meta,
|
||||||
|
delete_cursor,
|
||||||
|
delete_bitset_opt,
|
||||||
|
last_docstamp,
|
||||||
|
);
|
||||||
Ok(segment_updater.add_segment(generation, segment_entry))
|
Ok(segment_updater.add_segment(generation, segment_entry))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,9 +366,9 @@ impl IndexWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
|
pub fn add_segment(&mut self, segment_meta: SegmentMeta, opstamp: u64) {
|
||||||
let delete_cursor = self.delete_queue.cursor();
|
let delete_cursor = self.delete_queue.cursor();
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
|
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None, opstamp);
|
||||||
self.segment_updater
|
self.segment_updater
|
||||||
.add_segment(self.generation, segment_entry);
|
.add_segment(self.generation, segment_entry);
|
||||||
}
|
}
|
||||||
@@ -527,7 +532,7 @@ impl IndexWriter {
|
|||||||
//
|
//
|
||||||
// This will reach an end as the only document_sender
|
// This will reach an end as the only document_sender
|
||||||
// was dropped with the index_writer.
|
// was dropped with the index_writer.
|
||||||
for _ in document_receiver.clone() {}
|
for _ in document_receiver.iter() {}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -554,6 +559,16 @@ impl IndexWriter {
|
|||||||
/// using this API.
|
/// using this API.
|
||||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||||
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
|
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
|
||||||
|
info!("Preparing commit");
|
||||||
|
self.prepare_commit_internal(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn prepare_commit_soft(&mut self) -> Result<PreparedCommit> {
|
||||||
|
info!("Preparing soft commit");
|
||||||
|
self.prepare_commit_internal(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result<PreparedCommit> {
|
||||||
// Here, because we join all of the worker threads,
|
// Here, because we join all of the worker threads,
|
||||||
// all of the segment update for this commit have been
|
// all of the segment update for this commit have been
|
||||||
// sent.
|
// sent.
|
||||||
@@ -576,13 +591,13 @@ impl IndexWriter {
|
|||||||
let indexing_worker_result = worker_handle
|
let indexing_worker_result = worker_handle
|
||||||
.join()
|
.join()
|
||||||
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
||||||
indexing_worker_result?;
|
// add a new worker for the next generation, whether the worker failed or not.
|
||||||
// add a new worker for the next generation.
|
|
||||||
self.add_indexing_worker()?;
|
self.add_indexing_worker()?;
|
||||||
|
indexing_worker_result?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let commit_opstamp = self.stamper.stamp();
|
let commit_opstamp = self.stamper.stamp();
|
||||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft);
|
||||||
info!("Prepared commit {}", commit_opstamp);
|
info!("Prepared commit {}", commit_opstamp);
|
||||||
Ok(prepared_commit)
|
Ok(prepared_commit)
|
||||||
}
|
}
|
||||||
@@ -605,6 +620,11 @@ impl IndexWriter {
|
|||||||
self.prepare_commit()?.commit()
|
self.prepare_commit()?.commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn soft_commit(&mut self) -> Result<u64> {
|
||||||
|
self.prepare_commit_soft()?.commit()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
||||||
&self.segment_updater
|
&self.segment_updater
|
||||||
}
|
}
|
||||||
@@ -732,6 +752,7 @@ mod tests {
|
|||||||
use Index;
|
use Index;
|
||||||
use ReloadPolicy;
|
use ReloadPolicy;
|
||||||
use Term;
|
use Term;
|
||||||
|
use IndexReader;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_operations_group() {
|
fn test_operations_group() {
|
||||||
@@ -865,6 +886,13 @@ mod tests {
|
|||||||
let _index_writer_two = index.writer(3_000_000).unwrap();
|
let _index_writer_two = index.writer(3_000_000).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn num_docs_containing_text(reader: &IndexReader, term: &str) -> u64 {
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
let text_field = reader.schema().get_field("text").unwrap();
|
||||||
|
let term = Term::from_field_text(text_field, term);
|
||||||
|
searcher.doc_freq(&term)
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_commit_and_rollback() {
|
fn test_commit_and_rollback() {
|
||||||
let mut schema_builder = schema::Schema::builder();
|
let mut schema_builder = schema::Schema::builder();
|
||||||
@@ -881,9 +909,12 @@ mod tests {
|
|||||||
searcher.doc_freq(&term)
|
searcher.doc_freq(&term)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "a"), 0);
|
||||||
{
|
{
|
||||||
// writing the segment
|
// writing the segment
|
||||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
|
||||||
index_writer.add_document(doc!(text_field=>"a"));
|
index_writer.add_document(doc!(text_field=>"a"));
|
||||||
index_writer.rollback().unwrap();
|
index_writer.rollback().unwrap();
|
||||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||||
@@ -902,6 +933,35 @@ mod tests {
|
|||||||
reader.searcher();
|
reader.searcher();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_softcommit_and_rollback() {
|
||||||
|
let mut schema_builder = schema::Schema::builder();
|
||||||
|
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||||
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
let reader = index.reader().unwrap();
|
||||||
|
// writing the segment
|
||||||
|
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||||
|
index_writer.add_document(doc!(text_field=>"a"));
|
||||||
|
index_writer.rollback().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||||
|
{
|
||||||
|
index_writer.add_document(doc!(text_field=>"b"));
|
||||||
|
index_writer.add_document(doc!(text_field=>"c"));
|
||||||
|
}
|
||||||
|
assert!(index_writer.soft_commit().is_ok());
|
||||||
|
reader.reload().unwrap(); // we need to load soft committed stuff.
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "b"), 1u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "c"), 1u64);
|
||||||
|
index_writer.rollback().unwrap();
|
||||||
|
reader.reload().unwrap();
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "b"), 0u64);
|
||||||
|
assert_eq!(num_docs_containing_text(&reader, "c"), 0u64);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_with_merges() {
|
fn test_with_merges() {
|
||||||
let mut schema_builder = schema::Schema::builder();
|
let mut schema_builder = schema::Schema::builder();
|
||||||
@@ -935,7 +995,7 @@ mod tests {
|
|||||||
|
|
||||||
reader.reload().unwrap();
|
reader.reload().unwrap();
|
||||||
|
|
||||||
assert_eq!(num_docs_containing("a"), 200);
|
assert_eq!(num_docs_containing_text(&reader, "a"), 200);
|
||||||
assert!(index.searchable_segments().unwrap().len() < 8);
|
assert!(index.searchable_segments().unwrap().len() < 8);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -978,7 +1038,7 @@ mod tests {
|
|||||||
let mut schema_builder = schema::Schema::builder();
|
let mut schema_builder = schema::Schema::builder();
|
||||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||||
let index = Index::create_in_ram(schema_builder.build());
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
let reader = index.reader();
|
||||||
{
|
{
|
||||||
// writing the segment
|
// writing the segment
|
||||||
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use common::MAX_DOC_LIMIT;
|
||||||
use core::Segment;
|
use core::Segment;
|
||||||
use core::SegmentReader;
|
use core::SegmentReader;
|
||||||
use core::SerializableSegment;
|
use core::SerializableSegment;
|
||||||
@@ -23,6 +24,7 @@ use termdict::TermMerger;
|
|||||||
use termdict::TermOrdinal;
|
use termdict::TermOrdinal;
|
||||||
use DocId;
|
use DocId;
|
||||||
use Result;
|
use Result;
|
||||||
|
use TantivyError;
|
||||||
|
|
||||||
fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> u64 {
|
fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> u64 {
|
||||||
let mut total_tokens = 0u64;
|
let mut total_tokens = 0u64;
|
||||||
@@ -150,6 +152,14 @@ impl IndexMerger {
|
|||||||
readers.push(reader);
|
readers.push(reader);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if max_doc >= MAX_DOC_LIMIT {
|
||||||
|
let err_msg = format!(
|
||||||
|
"The segment resulting from this merge would have {} docs,\
|
||||||
|
which exceeds the limit {}.",
|
||||||
|
max_doc, MAX_DOC_LIMIT
|
||||||
|
);
|
||||||
|
return Err(TantivyError::InvalidArgument(err_msg));
|
||||||
|
}
|
||||||
Ok(IndexMerger {
|
Ok(IndexMerger {
|
||||||
schema,
|
schema,
|
||||||
readers,
|
readers,
|
||||||
|
|||||||
@@ -6,14 +6,20 @@ pub struct PreparedCommit<'a> {
|
|||||||
index_writer: &'a mut IndexWriter,
|
index_writer: &'a mut IndexWriter,
|
||||||
payload: Option<String>,
|
payload: Option<String>,
|
||||||
opstamp: u64,
|
opstamp: u64,
|
||||||
|
soft: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> PreparedCommit<'a> {
|
impl<'a> PreparedCommit<'a> {
|
||||||
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
|
pub(crate) fn new(
|
||||||
|
index_writer: &'a mut IndexWriter,
|
||||||
|
opstamp: u64,
|
||||||
|
soft: bool,
|
||||||
|
) -> PreparedCommit {
|
||||||
PreparedCommit {
|
PreparedCommit {
|
||||||
index_writer,
|
index_writer,
|
||||||
payload: None,
|
payload: None,
|
||||||
opstamp,
|
opstamp,
|
||||||
|
soft,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,7 +39,7 @@ impl<'a> PreparedCommit<'a> {
|
|||||||
info!("committing {}", self.opstamp);
|
info!("committing {}", self.opstamp);
|
||||||
self.index_writer
|
self.index_writer
|
||||||
.segment_updater()
|
.segment_updater()
|
||||||
.commit(self.opstamp, self.payload)?;
|
.commit(self.opstamp, self.payload, self.soft)?;
|
||||||
Ok(self.opstamp)
|
Ok(self.opstamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ pub struct SegmentEntry {
|
|||||||
meta: SegmentMeta,
|
meta: SegmentMeta,
|
||||||
delete_bitset: Option<BitSet>,
|
delete_bitset: Option<BitSet>,
|
||||||
delete_cursor: DeleteCursor,
|
delete_cursor: DeleteCursor,
|
||||||
|
opstamp: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentEntry {
|
impl SegmentEntry {
|
||||||
@@ -30,14 +31,20 @@ impl SegmentEntry {
|
|||||||
segment_meta: SegmentMeta,
|
segment_meta: SegmentMeta,
|
||||||
delete_cursor: DeleteCursor,
|
delete_cursor: DeleteCursor,
|
||||||
delete_bitset: Option<BitSet>,
|
delete_bitset: Option<BitSet>,
|
||||||
|
opstamp: u64,
|
||||||
) -> SegmentEntry {
|
) -> SegmentEntry {
|
||||||
SegmentEntry {
|
SegmentEntry {
|
||||||
meta: segment_meta,
|
meta: segment_meta,
|
||||||
delete_bitset,
|
delete_bitset,
|
||||||
delete_cursor,
|
delete_cursor,
|
||||||
|
opstamp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn opstamp(&self) -> u64 {
|
||||||
|
self.opstamp
|
||||||
|
}
|
||||||
|
|
||||||
/// Return a reference to the segment entry deleted bitset.
|
/// Return a reference to the segment entry deleted bitset.
|
||||||
///
|
///
|
||||||
/// `DocId` in this bitset are flagged as deleted.
|
/// `DocId` in this bitset are flagged as deleted.
|
||||||
@@ -46,7 +53,8 @@ impl SegmentEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Set the `SegmentMeta` for this segment.
|
/// Set the `SegmentMeta` for this segment.
|
||||||
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
|
pub fn set_meta(&mut self, opstamp: u64, segment_meta: SegmentMeta) {
|
||||||
|
self.opstamp = opstamp;
|
||||||
self.meta = segment_meta;
|
self.meta = segment_meta;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,11 +11,47 @@ use std::path::PathBuf;
|
|||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
||||||
use Result as TantivyResult;
|
use Result as TantivyResult;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
/// Provides a read-only view of the available segments.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AvailableSegments {
|
||||||
|
registers: Arc<RwLock<SegmentRegisters>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AvailableSegments {
|
||||||
|
pub fn committed(&self) -> Vec<SegmentMeta> {
|
||||||
|
self.registers
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.committed
|
||||||
|
.segment_metas()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn soft_committed(&self) -> Vec<SegmentMeta> {
|
||||||
|
self.registers
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.soft_committed
|
||||||
|
.segment_metas()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct SegmentRegisters {
|
struct SegmentRegisters {
|
||||||
uncommitted: SegmentRegister,
|
uncommitted: HashMap<SegmentId, SegmentEntry>,
|
||||||
committed: SegmentRegister,
|
committed: SegmentRegister,
|
||||||
|
/// soft commits can advance committed segment to a future delete
|
||||||
|
/// opstamp.
|
||||||
|
///
|
||||||
|
/// In that case the same `SegmentId` can appear in both `committed`
|
||||||
|
/// and in `committed_in_the_future`.
|
||||||
|
///
|
||||||
|
/// We do not consider these segments for merges.
|
||||||
|
soft_committed: SegmentRegister,
|
||||||
|
/// `DeleteCursor`, positionned on the soft commit.
|
||||||
|
delete_cursor: DeleteCursor,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The segment manager stores the list of segments
|
/// The segment manager stores the list of segments
|
||||||
@@ -23,9 +59,8 @@ struct SegmentRegisters {
|
|||||||
///
|
///
|
||||||
/// It guarantees the atomicity of the
|
/// It guarantees the atomicity of the
|
||||||
/// changes (merges especially)
|
/// changes (merges especially)
|
||||||
#[derive(Default)]
|
|
||||||
pub struct SegmentManager {
|
pub struct SegmentManager {
|
||||||
registers: RwLock<SegmentRegisters>,
|
registers: Arc<RwLock<SegmentRegisters>>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Debug for SegmentManager {
|
impl Debug for SegmentManager {
|
||||||
@@ -46,11 +81,17 @@ pub fn get_mergeable_segments(
|
|||||||
let registers_lock = segment_manager.read();
|
let registers_lock = segment_manager.read();
|
||||||
(
|
(
|
||||||
registers_lock
|
registers_lock
|
||||||
.committed
|
.soft_committed
|
||||||
.get_mergeable_segments(in_merge_segment_ids),
|
.get_mergeable_segments(in_merge_segment_ids),
|
||||||
registers_lock
|
registers_lock
|
||||||
.uncommitted
|
.uncommitted
|
||||||
.get_mergeable_segments(in_merge_segment_ids),
|
.values()
|
||||||
|
.map(|segment_entry| segment_entry.meta())
|
||||||
|
.filter(|segment_meta| {
|
||||||
|
!in_merge_segment_ids.contains(&segment_meta.id())
|
||||||
|
})
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -58,21 +99,22 @@ impl SegmentManager {
|
|||||||
pub fn from_segments(
|
pub fn from_segments(
|
||||||
segment_metas: Vec<SegmentMeta>,
|
segment_metas: Vec<SegmentMeta>,
|
||||||
delete_cursor: &DeleteCursor,
|
delete_cursor: &DeleteCursor,
|
||||||
|
opstamp: u64,
|
||||||
) -> SegmentManager {
|
) -> SegmentManager {
|
||||||
SegmentManager {
|
SegmentManager {
|
||||||
registers: RwLock::new(SegmentRegisters {
|
registers: Arc::new(RwLock::new(SegmentRegisters {
|
||||||
uncommitted: SegmentRegister::default(),
|
uncommitted: HashMap::default(),
|
||||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
committed: SegmentRegister::new(segment_metas.clone(), opstamp),
|
||||||
}),
|
soft_committed: SegmentRegister::new(segment_metas, opstamp),
|
||||||
|
delete_cursor: delete_cursor.clone(),
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns all of the segment entries (committed or uncommitted)
|
pub fn available_segments_view(&self) -> AvailableSegments {
|
||||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
AvailableSegments {
|
||||||
let registers_lock = self.read();
|
registers: self.registers.clone()
|
||||||
let mut segment_entries = registers_lock.uncommitted.segment_entries();
|
}
|
||||||
segment_entries.extend(registers_lock.committed.segment_entries());
|
|
||||||
segment_entries
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// List the files that are useful to the index.
|
/// List the files that are useful to the index.
|
||||||
@@ -108,44 +150,76 @@ impl SegmentManager {
|
|||||||
let mut registers_lock = self.write();
|
let mut registers_lock = self.write();
|
||||||
registers_lock
|
registers_lock
|
||||||
.committed
|
.committed
|
||||||
.segment_entries()
|
.segment_metas()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|segment| segment.meta().num_docs() == 0)
|
.filter(|segment_meta| segment_meta.num_docs() == 0)
|
||||||
.for_each(|segment| {
|
.for_each(|segment_meta| {
|
||||||
registers_lock
|
registers_lock
|
||||||
.committed
|
.committed
|
||||||
.remove_segment(&segment.segment_id())
|
.remove_segment(&segment_meta.id())
|
||||||
|
});
|
||||||
|
registers_lock
|
||||||
|
.soft_committed
|
||||||
|
.segment_metas()
|
||||||
|
.iter()
|
||||||
|
.filter(|segment_meta| segment_meta.num_docs() == 0)
|
||||||
|
.for_each(|segment_meta| {
|
||||||
|
registers_lock
|
||||||
|
.committed
|
||||||
|
.remove_segment(&segment_meta.id())
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
|
/// Returns all of the segment entries (soft committed or uncommitted)
|
||||||
let mut registers_lock = self.write();
|
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||||
registers_lock.committed.clear();
|
let registers_lock = self.read();
|
||||||
registers_lock.uncommitted.clear();
|
let mut segment_entries: Vec<SegmentEntry > = registers_lock.uncommitted.values().cloned().collect();
|
||||||
for segment_entry in segment_entries {
|
segment_entries.extend(registers_lock.soft_committed.segment_entries(®isters_lock.delete_cursor).into_iter());
|
||||||
registers_lock.committed.add_segment_entry(segment_entry);
|
segment_entries
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Marks a list of segments as in merge.
|
|
||||||
|
pub fn commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||||
|
let mut registers_lock = self.write();
|
||||||
|
registers_lock.uncommitted.clear();
|
||||||
|
registers_lock
|
||||||
|
.committed
|
||||||
|
.set_commit(opstamp, segment_entries.clone());
|
||||||
|
registers_lock
|
||||||
|
.soft_committed
|
||||||
|
.set_commit(opstamp, segment_entries);
|
||||||
|
registers_lock.delete_cursor.skip_to(opstamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn soft_commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||||
|
let mut registers_lock = self.write();
|
||||||
|
registers_lock.uncommitted.clear();
|
||||||
|
registers_lock
|
||||||
|
.soft_committed
|
||||||
|
.set_commit(opstamp, segment_entries);
|
||||||
|
registers_lock.delete_cursor.skip_to(opstamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the list of segment_entries associated to a list of `segment_ids`.
|
||||||
|
/// This method is used when starting a merge operations.
|
||||||
///
|
///
|
||||||
/// Returns an error if some segments are missing, or if
|
/// Returns an error if some segments are missing, or if
|
||||||
/// the `segment_ids` are not either all committed or all
|
/// the `segment_ids` are not either all soft_committed or all
|
||||||
/// uncommitted.
|
/// uncommitted.
|
||||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
|
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
|
||||||
let registers_lock = self.read();
|
let registers_lock = self.read();
|
||||||
let mut segment_entries = vec![];
|
let mut segment_entries = vec![];
|
||||||
if registers_lock.uncommitted.contains_all(segment_ids) {
|
if segment_ids.iter().all(|segment_id| registers_lock.uncommitted.contains_key(segment_id)) {
|
||||||
for segment_id in segment_ids {
|
for segment_id in segment_ids {
|
||||||
let segment_entry = registers_lock.uncommitted
|
let segment_entry = registers_lock.uncommitted
|
||||||
.get(segment_id)
|
.get(segment_id)
|
||||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||||
segment_entries.push(segment_entry);
|
segment_entries.push(segment_entry.clone());
|
||||||
}
|
}
|
||||||
} else if registers_lock.committed.contains_all(segment_ids) {
|
} else if registers_lock.soft_committed.contains_all(segment_ids) {
|
||||||
for segment_id in segment_ids {
|
for segment_id in segment_ids {
|
||||||
let segment_entry = registers_lock.committed
|
let segment_entry = registers_lock.soft_committed
|
||||||
.get(segment_id)
|
.get(segment_id, ®isters_lock.delete_cursor)
|
||||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||||
segment_entries.push(segment_entry);
|
segment_entries.push(segment_entry);
|
||||||
}
|
}
|
||||||
@@ -160,35 +234,32 @@ impl SegmentManager {
|
|||||||
|
|
||||||
pub fn add_segment(&self, segment_entry: SegmentEntry) {
|
pub fn add_segment(&self, segment_entry: SegmentEntry) {
|
||||||
let mut registers_lock = self.write();
|
let mut registers_lock = self.write();
|
||||||
registers_lock.uncommitted.add_segment_entry(segment_entry);
|
registers_lock
|
||||||
|
.uncommitted
|
||||||
|
.insert(segment_entry.segment_id(), segment_entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn end_merge(
|
pub fn end_merge(
|
||||||
&self,
|
&self,
|
||||||
before_merge_segment_ids: &[SegmentId],
|
before_merge_segment_ids: &[SegmentId],
|
||||||
after_merge_segment_entry: SegmentEntry,
|
after_merge_segment_entry: SegmentEntry
|
||||||
) {
|
) {
|
||||||
let mut registers_lock = self.write();
|
let mut registers_lock = self.write();
|
||||||
let target_register: &mut SegmentRegister = {
|
|
||||||
if registers_lock
|
if before_merge_segment_ids.iter().all(|seg_id|
|
||||||
|
registers_lock
|
||||||
.uncommitted
|
.uncommitted
|
||||||
.contains_all(before_merge_segment_ids)
|
.contains_key(seg_id))
|
||||||
{
|
{
|
||||||
&mut registers_lock.uncommitted
|
for segment_id in before_merge_segment_ids {
|
||||||
} else if registers_lock
|
registers_lock.uncommitted.remove(&segment_id);
|
||||||
.committed
|
|
||||||
.contains_all(before_merge_segment_ids)
|
|
||||||
{
|
|
||||||
&mut registers_lock.committed
|
|
||||||
} else {
|
|
||||||
warn!("couldn't find segment in SegmentManager");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
};
|
registers_lock.uncommitted.insert(after_merge_segment_entry.segment_id(),
|
||||||
for segment_id in before_merge_segment_ids {
|
after_merge_segment_entry);
|
||||||
target_register.remove_segment(segment_id);
|
} else {
|
||||||
|
registers_lock.committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry);
|
||||||
|
registers_lock.soft_committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry)
|
||||||
}
|
}
|
||||||
target_register.add_segment_entry(after_merge_segment_entry);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
|
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
|
||||||
|
|||||||
@@ -16,7 +16,8 @@ use std::fmt::{self, Debug, Formatter};
|
|||||||
/// merge candidates.
|
/// merge candidates.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct SegmentRegister {
|
pub struct SegmentRegister {
|
||||||
segment_states: HashMap<SegmentId, SegmentEntry>,
|
segment_states: HashMap<SegmentId, SegmentMeta>,
|
||||||
|
opstamp_constraint: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Debug for SegmentRegister {
|
impl Debug for SegmentRegister {
|
||||||
@@ -41,23 +42,28 @@ impl SegmentRegister {
|
|||||||
) -> Vec<SegmentMeta> {
|
) -> Vec<SegmentMeta> {
|
||||||
self.segment_states
|
self.segment_states
|
||||||
.values()
|
.values()
|
||||||
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
|
.filter(|segment_meta| !in_merge_segment_ids.contains(&segment_meta.id()))
|
||||||
.map(|segment_entry| segment_entry.meta().clone())
|
.cloned()
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
|
||||||
self.segment_states.values().cloned().collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
|
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
|
||||||
let mut segment_ids: Vec<SegmentMeta> = self
|
let mut segment_metas: Vec<SegmentMeta> = self
|
||||||
.segment_states
|
.segment_states
|
||||||
.values()
|
.values()
|
||||||
.map(|segment_entry| segment_entry.meta().clone())
|
.cloned()
|
||||||
.collect();
|
.collect();
|
||||||
segment_ids.sort_by_key(|meta| meta.id());
|
segment_metas.sort_by_key(|meta| meta.id());
|
||||||
segment_ids
|
segment_metas
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn segment_entries(&self, delete_cursor: &DeleteCursor) -> Vec<SegmentEntry> {
|
||||||
|
self.segment_states
|
||||||
|
.values()
|
||||||
|
.map(|segment_meta| {
|
||||||
|
SegmentEntry::new(segment_meta.clone(), delete_cursor.clone(), None, self.opstamp_constraint)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
|
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
|
||||||
@@ -66,27 +72,77 @@ impl SegmentRegister {
|
|||||||
.all(|segment_id| self.segment_states.contains_key(segment_id))
|
.all(|segment_id| self.segment_states.contains_key(segment_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
|
pub fn receive_merge(&mut self,
|
||||||
|
before_merge_segment_ids: &[SegmentId],
|
||||||
|
after_merge_segment_entry: &SegmentEntry) {
|
||||||
|
if after_merge_segment_entry.opstamp() != self.opstamp_constraint {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if !self.contains_all(before_merge_segment_ids) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for segment_id in before_merge_segment_ids {
|
||||||
|
self.segment_states.remove(segment_id);
|
||||||
|
}
|
||||||
|
self.register_segment_entry(after_merge_segment_entry.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers a `SegmentEntry`.
|
||||||
|
///
|
||||||
|
/// If a segment entry associated to this `SegmentId` is already there,
|
||||||
|
/// override it with the new `SegmentEntry`.
|
||||||
|
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
|
||||||
|
if self.opstamp_constraint != segment_entry.opstamp() {
|
||||||
|
panic!(format!(
|
||||||
|
"Invalid segment. Expect opstamp {}, got {}.",
|
||||||
|
self.opstamp_constraint,
|
||||||
|
segment_entry.opstamp()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if segment_entry.meta().num_docs() == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
let segment_id = segment_entry.segment_id();
|
let segment_id = segment_entry.segment_id();
|
||||||
self.segment_states.insert(segment_id, segment_entry);
|
// Check that we are ok with deletes.
|
||||||
|
self.segment_states.insert(segment_id, segment_entry.meta().clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||||
|
self.segment_states.clear();
|
||||||
|
self.opstamp_constraint = opstamp;
|
||||||
|
for segment_entry in segment_entries {
|
||||||
|
self.register_segment_entry(segment_entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
|
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
|
||||||
self.segment_states.remove(segment_id);
|
self.segment_states.remove(&segment_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
pub fn get(&self, segment_id: &SegmentId, delete_cursor: &DeleteCursor) -> Option<SegmentEntry> {
|
||||||
self.segment_states.get(segment_id).cloned()
|
self.segment_states
|
||||||
|
.get(&segment_id)
|
||||||
|
.map(|segment_meta|
|
||||||
|
SegmentEntry::new(
|
||||||
|
segment_meta.clone(),
|
||||||
|
delete_cursor.clone(),
|
||||||
|
None,
|
||||||
|
self.opstamp_constraint
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
pub fn new(
|
||||||
|
segment_metas: Vec<SegmentMeta>,
|
||||||
|
opstamp: u64,
|
||||||
|
) -> SegmentRegister {
|
||||||
let mut segment_states = HashMap::new();
|
let mut segment_states = HashMap::new();
|
||||||
for segment_meta in segment_metas {
|
for segment_meta in segment_metas {
|
||||||
let segment_id = segment_meta.id();
|
segment_states.insert(segment_meta.id(), segment_meta);
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
|
}
|
||||||
segment_states.insert(segment_id, segment_entry);
|
SegmentRegister {
|
||||||
|
segment_states,
|
||||||
|
opstamp_constraint: opstamp,
|
||||||
}
|
}
|
||||||
SegmentRegister { segment_states }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,22 +171,22 @@ mod tests {
|
|||||||
let segment_id_merged = SegmentId::generate_random();
|
let segment_id_merged = SegmentId::generate_random();
|
||||||
|
|
||||||
{
|
{
|
||||||
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
|
let segment_meta = SegmentMeta::new(segment_id_a, 1u32);
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
|
||||||
segment_register.add_segment_entry(segment_entry);
|
segment_register.register_segment_entry(segment_entry);
|
||||||
}
|
}
|
||||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||||
{
|
{
|
||||||
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
|
let segment_meta = SegmentMeta::new(segment_id_b, 2u32);
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
|
||||||
segment_register.add_segment_entry(segment_entry);
|
segment_register.register_segment_entry(segment_entry);
|
||||||
}
|
}
|
||||||
segment_register.remove_segment(&segment_id_a);
|
|
||||||
segment_register.remove_segment(&segment_id_b);
|
|
||||||
{
|
{
|
||||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
|
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 3u32);
|
||||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
|
let segment_entry =
|
||||||
segment_register.add_segment_entry(segment_entry);
|
SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None, 0u64);
|
||||||
|
segment_register.receive_merge(&[segment_id_a, segment_id_b], &segment_entry);
|
||||||
|
segment_register.register_segment_entry(segment_entry);
|
||||||
}
|
}
|
||||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
|
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ fn perform_merge(
|
|||||||
|
|
||||||
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
|
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
|
||||||
|
|
||||||
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
|
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None, target_opstamp);
|
||||||
Ok(after_merge_segment_entry)
|
Ok(after_merge_segment_entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,8 +155,11 @@ impl SegmentUpdater {
|
|||||||
stamper: Stamper,
|
stamper: Stamper,
|
||||||
delete_cursor: &DeleteCursor,
|
delete_cursor: &DeleteCursor,
|
||||||
) -> Result<SegmentUpdater> {
|
) -> Result<SegmentUpdater> {
|
||||||
|
|
||||||
|
let index_meta = index.load_metas()?;
|
||||||
let segments = index.searchable_segment_metas()?;
|
let segments = index.searchable_segment_metas()?;
|
||||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
let opstamp = index_meta.opstamp;
|
||||||
|
let segment_manager = SegmentManager::from_segments(segments, delete_cursor, opstamp);
|
||||||
let pool = CpuPoolBuilder::new()
|
let pool = CpuPoolBuilder::new()
|
||||||
.name_prefix("segment_updater")
|
.name_prefix("segment_updater")
|
||||||
.pool_size(1)
|
.pool_size(1)
|
||||||
@@ -280,14 +283,30 @@ impl SegmentUpdater {
|
|||||||
.garbage_collect(|| self.0.segment_manager.list_files());
|
.garbage_collect(|| self.0.segment_manager.list_files());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
|
pub fn commit(&self, opstamp: u64, payload: Option<String>, soft: bool) -> Result<()> {
|
||||||
self.run_async(move |segment_updater| {
|
self.run_async(move |segment_updater| {
|
||||||
if segment_updater.is_alive() {
|
if segment_updater.is_alive() {
|
||||||
let segment_entries = segment_updater
|
let segment_entries = segment_updater
|
||||||
.purge_deletes(opstamp)
|
.purge_deletes(opstamp)
|
||||||
.expect("Failed purge deletes");
|
.expect("Failed purge deletes");
|
||||||
segment_updater.0.segment_manager.commit(segment_entries);
|
if soft {
|
||||||
segment_updater.save_metas(opstamp, payload);
|
// Soft commit.
|
||||||
|
//
|
||||||
|
// The list `segment_entries` above is what we might want to use as searchable
|
||||||
|
// segment. However, we do not want to mark them as committed, and we want
|
||||||
|
// to keep the current set of committed segment.
|
||||||
|
segment_updater.0.segment_manager.soft_commit(opstamp, segment_entries);
|
||||||
|
// ... We do not save the meta file.
|
||||||
|
} else {
|
||||||
|
// Hard_commit. We register the new segment entries as committed.
|
||||||
|
segment_updater
|
||||||
|
.0
|
||||||
|
.segment_manager
|
||||||
|
.commit(opstamp, segment_entries);
|
||||||
|
// TODO error handling.
|
||||||
|
segment_updater.save_metas(opstamp, payload);
|
||||||
|
segment_updater.0.index.directory().flush().unwrap();
|
||||||
|
}
|
||||||
segment_updater.garbage_collect_files_exec();
|
segment_updater.garbage_collect_files_exec();
|
||||||
segment_updater.consider_merge_options();
|
segment_updater.consider_merge_options();
|
||||||
}
|
}
|
||||||
@@ -420,6 +439,7 @@ impl SegmentUpdater {
|
|||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
merge_candidates.extend(committed_merge_candidates.into_iter());
|
merge_candidates.extend(committed_merge_candidates.into_iter());
|
||||||
|
|
||||||
for merge_operation in merge_candidates {
|
for merge_operation in merge_candidates {
|
||||||
match self.start_merge_impl(merge_operation) {
|
match self.start_merge_impl(merge_operation) {
|
||||||
Ok(merge_future) => {
|
Ok(merge_future) => {
|
||||||
|
|||||||
@@ -174,6 +174,7 @@ extern crate downcast_rs;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate fail;
|
extern crate fail;
|
||||||
|
|
||||||
|
#[cfg(feature = "mmap")]
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod functional_test;
|
mod functional_test;
|
||||||
|
|
||||||
|
|||||||
229
src/postings/block_search.rs
Normal file
229
src/postings/block_search.rs
Normal file
@@ -0,0 +1,229 @@
|
|||||||
|
/// This modules define the logic used to search for a doc in a given
|
||||||
|
/// block. (at most 128 docs)
|
||||||
|
///
|
||||||
|
/// Searching within a block is a hotspot when running intersection.
|
||||||
|
/// so it was worth defining it in its own module.
|
||||||
|
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
|
mod sse2 {
|
||||||
|
use postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||||
|
use std::arch::x86_64::__m128i as DataType;
|
||||||
|
use std::arch::x86_64::_mm_add_epi32 as op_add;
|
||||||
|
use std::arch::x86_64::_mm_cmplt_epi32 as op_lt;
|
||||||
|
use std::arch::x86_64::_mm_load_si128 as op_load; // requires 128-bits alignment
|
||||||
|
use std::arch::x86_64::_mm_set1_epi32 as set1;
|
||||||
|
use std::arch::x86_64::_mm_setzero_si128 as set0;
|
||||||
|
use std::arch::x86_64::_mm_sub_epi32 as op_sub;
|
||||||
|
use std::arch::x86_64::{_mm_cvtsi128_si32, _mm_shuffle_epi32};
|
||||||
|
|
||||||
|
const MASK1: i32 = 78;
|
||||||
|
const MASK2: i32 = 177;
|
||||||
|
|
||||||
|
/// Performs an exhaustive linear search over the
|
||||||
|
///
|
||||||
|
/// There is no early exit here. We simply count the
|
||||||
|
/// number of elements that are `< target`.
|
||||||
|
pub fn linear_search_sse2_128(arr: &[u32], target: u32) -> usize {
|
||||||
|
unsafe {
|
||||||
|
let ptr = arr.as_ptr() as *const DataType;
|
||||||
|
let vkey = set1(target as i32);
|
||||||
|
let mut cnt = set0();
|
||||||
|
// We work over 4 `__m128i` at a time.
|
||||||
|
// A single `__m128i` actual contains 4 `u32`.
|
||||||
|
for i in 0..(COMPRESSION_BLOCK_SIZE as isize) / (4 * 4) {
|
||||||
|
let cmp1 = op_lt(op_load(ptr.offset(i * 4)), vkey);
|
||||||
|
let cmp2 = op_lt(op_load(ptr.offset(i * 4 + 1)), vkey);
|
||||||
|
let cmp3 = op_lt(op_load(ptr.offset(i * 4 + 2)), vkey);
|
||||||
|
let cmp4 = op_lt(op_load(ptr.offset(i * 4 + 3)), vkey);
|
||||||
|
let sum = op_add(op_add(cmp1, cmp2), op_add(cmp3, cmp4));
|
||||||
|
cnt = op_sub(cnt, sum);
|
||||||
|
}
|
||||||
|
cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK1));
|
||||||
|
cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK2));
|
||||||
|
_mm_cvtsi128_si32(cnt) as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::linear_search_sse2_128;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_linear_search_sse2_128_u32() {
|
||||||
|
for i in 0..23 {
|
||||||
|
dbg!(i);
|
||||||
|
let arr: Vec<u32> = (0..128).map(|el| el * 2 + 1 << 18).collect();
|
||||||
|
assert_eq!(linear_search_sse2_128(&arr, arr[64] + 1), 65);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This `linear search` browser exhaustively through the array.
|
||||||
|
/// but the early exit is very difficult to predict.
|
||||||
|
///
|
||||||
|
/// Coupled with `exponential search` this function is likely
|
||||||
|
/// to be called with the same `len`
|
||||||
|
fn linear_search(arr: &[u32], target: u32) -> usize {
|
||||||
|
arr.iter().map(|&el| if el < target { 1 } else { 0 }).sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exponential_search(arr: &[u32], target: u32) -> (usize, usize) {
|
||||||
|
let end = arr.len();
|
||||||
|
let mut begin = 0;
|
||||||
|
for &pivot in &[1, 3, 7, 15, 31, 63] {
|
||||||
|
if pivot >= end {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if arr[pivot] > target {
|
||||||
|
return (begin, pivot);
|
||||||
|
}
|
||||||
|
begin = pivot;
|
||||||
|
}
|
||||||
|
(begin, end)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn galloping(block_docs: &[u32], target: u32) -> usize {
|
||||||
|
let (start, end) = exponential_search(&block_docs, target);
|
||||||
|
start + linear_search(&block_docs[start..end], target)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tantivy may rely on SIMD instructions to search for a specific document within
|
||||||
|
/// a given block.
|
||||||
|
#[derive(Clone, Copy, PartialEq)]
|
||||||
|
pub enum BlockSearcher {
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
|
SSE2,
|
||||||
|
Scalar,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSearcher {
|
||||||
|
/// Search the first index containing an element greater or equal to
|
||||||
|
/// the target.
|
||||||
|
///
|
||||||
|
/// The results should be equivalent to
|
||||||
|
/// ```ignore
|
||||||
|
/// block[..]
|
||||||
|
// .iter()
|
||||||
|
// .take_while(|&&val| val < target)
|
||||||
|
// .count()
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// The `start` argument is just used to hint that the response is
|
||||||
|
/// greater than beyond `start`. The implementation may or may not use
|
||||||
|
/// it for optimization.
|
||||||
|
///
|
||||||
|
/// # Assumption
|
||||||
|
///
|
||||||
|
/// The array len is > start.
|
||||||
|
/// The block is sorted
|
||||||
|
/// The target is assumed greater or equal to the `arr[start]`.
|
||||||
|
/// The target is assumed smaller or equal to the last element of the block.
|
||||||
|
///
|
||||||
|
/// Currently the scalar implementation starts by an exponential search, and
|
||||||
|
/// then operates a linear search in the result subarray.
|
||||||
|
///
|
||||||
|
/// If SSE2 instructions are available in the `(platform, running CPU)`,
|
||||||
|
/// then we use a different implementation that does an exhaustive linear search over
|
||||||
|
/// the full block whenever the block is full (`len == 128`). It is surprisingly faster, most likely because of the lack
|
||||||
|
/// of branch.
|
||||||
|
pub fn search_in_block(&self, block_docs: &[u32], start: usize, target: u32) -> usize {
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
|
{
|
||||||
|
use postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||||
|
if *self == BlockSearcher::SSE2 {
|
||||||
|
if block_docs.len() == COMPRESSION_BLOCK_SIZE {
|
||||||
|
return sse2::linear_search_sse2_128(block_docs, target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
start + galloping(&block_docs[start..], target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for BlockSearcher {
|
||||||
|
fn default() -> BlockSearcher {
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
|
{
|
||||||
|
if is_x86_feature_detected!("sse2") {
|
||||||
|
return BlockSearcher::SSE2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BlockSearcher::Scalar
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::exponential_search;
|
||||||
|
use super::linear_search;
|
||||||
|
use super::BlockSearcher;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_linear_search() {
|
||||||
|
let len: usize = 50;
|
||||||
|
let arr: Vec<u32> = (0..len).map(|el| 1u32 + (el as u32) * 2).collect();
|
||||||
|
for target in 1..*arr.last().unwrap() {
|
||||||
|
let res = linear_search(&arr[..], target);
|
||||||
|
if res > 0 {
|
||||||
|
assert!(arr[res - 1] < target);
|
||||||
|
}
|
||||||
|
if res < len {
|
||||||
|
assert!(arr[res] >= target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_exponentiel_search() {
|
||||||
|
assert_eq!(exponential_search(&[1, 2], 0), (0, 1));
|
||||||
|
assert_eq!(exponential_search(&[1, 2], 1), (0, 1));
|
||||||
|
assert_eq!(
|
||||||
|
exponential_search(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 7),
|
||||||
|
(3, 7)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn util_test_search_in_block(block_searcher: BlockSearcher, block: &[u32], target: u32) {
|
||||||
|
let cursor = search_in_block_trivial_but_slow(block, target);
|
||||||
|
for i in 0..cursor {
|
||||||
|
assert_eq!(block_searcher.search_in_block(block, i, target), cursor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn util_test_search_in_block_all(block_searcher: BlockSearcher, block: &[u32]) {
|
||||||
|
use std::collections::HashSet;
|
||||||
|
let mut targets = HashSet::new();
|
||||||
|
for (i, val) in block.iter().cloned().enumerate() {
|
||||||
|
if i > 0 {
|
||||||
|
targets.insert(val - 1);
|
||||||
|
}
|
||||||
|
targets.insert(val);
|
||||||
|
}
|
||||||
|
for target in targets {
|
||||||
|
util_test_search_in_block(block_searcher, block, target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_in_block_trivial_but_slow(block: &[u32], target: u32) -> usize {
|
||||||
|
block.iter().take_while(|&&val| val < target).count()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_search_in_block_util(block_searcher: BlockSearcher) {
|
||||||
|
for len in 1u32..128u32 {
|
||||||
|
let v: Vec<u32> = (0..len).map(|i| i * 2).collect();
|
||||||
|
util_test_search_in_block_all(block_searcher, &v[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_search_in_block_scalar() {
|
||||||
|
test_search_in_block_util(BlockSearcher::Scalar);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
|
#[test]
|
||||||
|
fn test_search_in_block_sse2() {
|
||||||
|
test_search_in_block_util(BlockSearcher::SSE2);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -43,9 +43,14 @@ impl BlockEncoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We ensure that the OutputBuffer is align on 128 bits
|
||||||
|
/// in order to run SSE2 linear search on it.
|
||||||
|
#[repr(align(128))]
|
||||||
|
struct OutputBuffer([u32; COMPRESSION_BLOCK_SIZE + 1]);
|
||||||
|
|
||||||
pub struct BlockDecoder {
|
pub struct BlockDecoder {
|
||||||
bitpacker: BitPacker4x,
|
bitpacker: BitPacker4x,
|
||||||
pub output: [u32; COMPRESSION_BLOCK_SIZE + 1],
|
output: OutputBuffer,
|
||||||
pub output_len: usize,
|
pub output_len: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,7 +64,7 @@ impl BlockDecoder {
|
|||||||
output[COMPRESSION_BLOCK_SIZE] = 0u32;
|
output[COMPRESSION_BLOCK_SIZE] = 0u32;
|
||||||
BlockDecoder {
|
BlockDecoder {
|
||||||
bitpacker: BitPacker4x::new(),
|
bitpacker: BitPacker4x::new(),
|
||||||
output,
|
output: OutputBuffer(output),
|
||||||
output_len: 0,
|
output_len: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,23 +77,23 @@ impl BlockDecoder {
|
|||||||
) -> usize {
|
) -> usize {
|
||||||
self.output_len = COMPRESSION_BLOCK_SIZE;
|
self.output_len = COMPRESSION_BLOCK_SIZE;
|
||||||
self.bitpacker
|
self.bitpacker
|
||||||
.decompress_sorted(offset, &compressed_data, &mut self.output, num_bits)
|
.decompress_sorted(offset, &compressed_data, &mut self.output.0, num_bits)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn uncompress_block_unsorted(&mut self, compressed_data: &[u8], num_bits: u8) -> usize {
|
pub fn uncompress_block_unsorted(&mut self, compressed_data: &[u8], num_bits: u8) -> usize {
|
||||||
self.output_len = COMPRESSION_BLOCK_SIZE;
|
self.output_len = COMPRESSION_BLOCK_SIZE;
|
||||||
self.bitpacker
|
self.bitpacker
|
||||||
.decompress(&compressed_data, &mut self.output, num_bits)
|
.decompress(&compressed_data, &mut self.output.0, num_bits)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn output_array(&self) -> &[u32] {
|
pub fn output_array(&self) -> &[u32] {
|
||||||
&self.output[..self.output_len]
|
&self.output.0[..self.output_len]
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn output(&self, idx: usize) -> u32 {
|
pub fn output(&self, idx: usize) -> u32 {
|
||||||
self.output[idx]
|
self.output.0[idx]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,12 +164,12 @@ impl VIntDecoder for BlockDecoder {
|
|||||||
num_els: usize,
|
num_els: usize,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
self.output_len = num_els;
|
self.output_len = num_els;
|
||||||
vint::uncompress_sorted(compressed_data, &mut self.output[..num_els], offset)
|
vint::uncompress_sorted(compressed_data, &mut self.output.0[..num_els], offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) -> usize {
|
fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) -> usize {
|
||||||
self.output_len = num_els;
|
self.output_len = num_els;
|
||||||
vint::uncompress_unsorted(compressed_data, &mut self.output[..num_els])
|
vint::uncompress_unsorted(compressed_data, &mut self.output.0[..num_els])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
Postings module (also called inverted index)
|
Postings module (also called inverted index)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
mod block_search;
|
||||||
pub(crate) mod compression;
|
pub(crate) mod compression;
|
||||||
/// Postings module
|
/// Postings module
|
||||||
///
|
///
|
||||||
@@ -16,6 +17,8 @@ mod skip;
|
|||||||
mod stacker;
|
mod stacker;
|
||||||
mod term_info;
|
mod term_info;
|
||||||
|
|
||||||
|
pub(crate) use self::block_search::BlockSearcher;
|
||||||
|
|
||||||
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
||||||
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
||||||
|
|
||||||
@@ -104,9 +107,7 @@ pub mod tests {
|
|||||||
let searcher = index.reader().unwrap().searcher();
|
let searcher = index.reader().unwrap().searcher();
|
||||||
let inverted_index = searcher.segment_reader(0u32).inverted_index(title);
|
let inverted_index = searcher.segment_reader(0u32).inverted_index(title);
|
||||||
let term = Term::from_field_text(title, "abc");
|
let term = Term::from_field_text(title, "abc");
|
||||||
|
|
||||||
let mut positions = Vec::new();
|
let mut positions = Vec::new();
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut postings = inverted_index
|
let mut postings = inverted_index
|
||||||
.read_postings(&term, IndexRecordOption::WithFreqsAndPositions)
|
.read_postings(&term, IndexRecordOption::WithFreqsAndPositions)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use positions::PositionReader;
|
|||||||
use postings::compression::compressed_block_size;
|
use postings::compression::compressed_block_size;
|
||||||
use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
|
use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
|
||||||
use postings::serializer::PostingsSerializer;
|
use postings::serializer::PostingsSerializer;
|
||||||
|
use postings::BlockSearcher;
|
||||||
use postings::FreqReadingOption;
|
use postings::FreqReadingOption;
|
||||||
use postings::Postings;
|
use postings::Postings;
|
||||||
use postings::SkipReader;
|
use postings::SkipReader;
|
||||||
@@ -60,6 +61,7 @@ pub struct SegmentPostings {
|
|||||||
block_cursor: BlockSegmentPostings,
|
block_cursor: BlockSegmentPostings,
|
||||||
cur: usize,
|
cur: usize,
|
||||||
position_computer: Option<PositionComputer>,
|
position_computer: Option<PositionComputer>,
|
||||||
|
block_searcher: BlockSearcher,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentPostings {
|
impl SegmentPostings {
|
||||||
@@ -70,6 +72,7 @@ impl SegmentPostings {
|
|||||||
block_cursor: empty_block_cursor,
|
block_cursor: empty_block_cursor,
|
||||||
cur: COMPRESSION_BLOCK_SIZE,
|
cur: COMPRESSION_BLOCK_SIZE,
|
||||||
position_computer: None,
|
position_computer: None,
|
||||||
|
block_searcher: BlockSearcher::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,42 +120,31 @@ impl SegmentPostings {
|
|||||||
block_cursor: segment_block_postings,
|
block_cursor: segment_block_postings,
|
||||||
cur: COMPRESSION_BLOCK_SIZE, // cursor within the block
|
cur: COMPRESSION_BLOCK_SIZE, // cursor within the block
|
||||||
position_computer: positions_stream_opt.map(PositionComputer::new),
|
position_computer: positions_stream_opt.map(PositionComputer::new),
|
||||||
|
block_searcher: BlockSearcher::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn linear_search(arr: &[u32], target: u32) -> usize {
|
|
||||||
arr.iter().map(|&el| if el < target { 1 } else { 0 }).sum()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn exponential_search(arr: &[u32], target: u32) -> (usize, usize) {
|
|
||||||
let end = arr.len();
|
|
||||||
let mut begin = 0;
|
|
||||||
for &pivot in &[1, 3, 7, 15, 31, 63] {
|
|
||||||
if pivot >= end {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if arr[pivot] > target {
|
|
||||||
return (begin, pivot);
|
|
||||||
}
|
|
||||||
begin = pivot;
|
|
||||||
}
|
|
||||||
(begin, end)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Search the first index containing an element greater or equal to the target.
|
|
||||||
///
|
|
||||||
/// # Assumption
|
|
||||||
///
|
|
||||||
/// The array is assumed non empty.
|
|
||||||
/// The target is assumed greater or equal to the first element.
|
|
||||||
/// The target is assumed smaller or equal to the last element.
|
|
||||||
fn search_within_block(block_docs: &[u32], target: u32) -> usize {
|
|
||||||
let (start, end) = exponential_search(block_docs, target);
|
|
||||||
start + linear_search(&block_docs[start..end], target)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DocSet for SegmentPostings {
|
impl DocSet for SegmentPostings {
|
||||||
|
// goes to the next element.
|
||||||
|
// next needs to be called a first time to point to the correct element.
|
||||||
|
#[inline]
|
||||||
|
fn advance(&mut self) -> bool {
|
||||||
|
if self.position_computer.is_some() {
|
||||||
|
let term_freq = self.term_freq() as usize;
|
||||||
|
self.position_computer.as_mut().unwrap().add_skip(term_freq);
|
||||||
|
}
|
||||||
|
self.cur += 1;
|
||||||
|
if self.cur >= self.block_cursor.block_len() {
|
||||||
|
self.cur = 0;
|
||||||
|
if !self.block_cursor.advance() {
|
||||||
|
self.cur = COMPRESSION_BLOCK_SIZE;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn skip_next(&mut self, target: DocId) -> SkipResult {
|
fn skip_next(&mut self, target: DocId) -> SkipResult {
|
||||||
if !self.advance() {
|
if !self.advance() {
|
||||||
return SkipResult::End;
|
return SkipResult::End;
|
||||||
@@ -211,9 +203,8 @@ impl DocSet for SegmentPostings {
|
|||||||
// we're in the right block now, start with an exponential search
|
// we're in the right block now, start with an exponential search
|
||||||
let block_docs = self.block_cursor.docs();
|
let block_docs = self.block_cursor.docs();
|
||||||
let new_cur = self
|
let new_cur = self
|
||||||
.cur
|
.block_searcher
|
||||||
.wrapping_add(search_within_block(&block_docs[self.cur..], target));
|
.search_in_block(&block_docs, self.cur, target);
|
||||||
|
|
||||||
if need_positions {
|
if need_positions {
|
||||||
sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur]
|
sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur]
|
||||||
.iter()
|
.iter()
|
||||||
@@ -235,29 +226,6 @@ impl DocSet for SegmentPostings {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// goes to the next element.
|
|
||||||
// next needs to be called a first time to point to the correct element.
|
|
||||||
#[inline]
|
|
||||||
fn advance(&mut self) -> bool {
|
|
||||||
if self.position_computer.is_some() {
|
|
||||||
let term_freq = self.term_freq() as usize;
|
|
||||||
self.position_computer.as_mut().unwrap().add_skip(term_freq);
|
|
||||||
}
|
|
||||||
self.cur += 1;
|
|
||||||
if self.cur >= self.block_cursor.block_len() {
|
|
||||||
self.cur = 0;
|
|
||||||
if !self.block_cursor.advance() {
|
|
||||||
self.cur = COMPRESSION_BLOCK_SIZE;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> u32 {
|
|
||||||
self.len() as u32
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the current document's `DocId`.
|
/// Return the current document's `DocId`.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn doc(&self) -> DocId {
|
fn doc(&self) -> DocId {
|
||||||
@@ -269,6 +237,10 @@ impl DocSet for SegmentPostings {
|
|||||||
docs[self.cur]
|
docs[self.cur]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> u32 {
|
||||||
|
self.len() as u32
|
||||||
|
}
|
||||||
|
|
||||||
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
|
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
|
||||||
// finish the current block
|
// finish the current block
|
||||||
if self.advance() {
|
if self.advance() {
|
||||||
@@ -614,10 +586,6 @@ impl<'b> Streamer<'b> for BlockSegmentPostings {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use super::exponential_search;
|
|
||||||
use super::linear_search;
|
|
||||||
use super::search_within_block;
|
|
||||||
use super::BlockSegmentPostings;
|
use super::BlockSegmentPostings;
|
||||||
use super::BlockSegmentPostingsSkipResult;
|
use super::BlockSegmentPostingsSkipResult;
|
||||||
use super::SegmentPostings;
|
use super::SegmentPostings;
|
||||||
@@ -632,21 +600,6 @@ mod tests {
|
|||||||
use DocId;
|
use DocId;
|
||||||
use SkipResult;
|
use SkipResult;
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_linear_search() {
|
|
||||||
let len: usize = 50;
|
|
||||||
let arr: Vec<u32> = (0..len).map(|el| 1u32 + (el as u32) * 2).collect();
|
|
||||||
for target in 1..*arr.last().unwrap() {
|
|
||||||
let res = linear_search(&arr[..], target);
|
|
||||||
if res > 0 {
|
|
||||||
assert!(arr[res - 1] < target);
|
|
||||||
}
|
|
||||||
if res < len {
|
|
||||||
assert!(arr[res] >= target);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_empty_segment_postings() {
|
fn test_empty_segment_postings() {
|
||||||
let mut postings = SegmentPostings::empty();
|
let mut postings = SegmentPostings::empty();
|
||||||
@@ -662,56 +615,6 @@ mod tests {
|
|||||||
assert_eq!(postings.doc_freq(), 0);
|
assert_eq!(postings.doc_freq(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn search_within_block_trivial_but_slow(block: &[u32], target: u32) -> usize {
|
|
||||||
block
|
|
||||||
.iter()
|
|
||||||
.cloned()
|
|
||||||
.enumerate()
|
|
||||||
.filter(|&(_, ref val)| *val >= target)
|
|
||||||
.next()
|
|
||||||
.unwrap()
|
|
||||||
.0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_exponentiel_search() {
|
|
||||||
assert_eq!(exponential_search(&[1, 2], 0), (0, 1));
|
|
||||||
assert_eq!(exponential_search(&[1, 2], 1), (0, 1));
|
|
||||||
assert_eq!(
|
|
||||||
exponential_search(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 7),
|
|
||||||
(3, 7)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn util_test_search_within_block(block: &[u32], target: u32) {
|
|
||||||
assert_eq!(
|
|
||||||
search_within_block(block, target),
|
|
||||||
search_within_block_trivial_but_slow(block, target)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn util_test_search_within_block_all(block: &[u32]) {
|
|
||||||
use std::collections::HashSet;
|
|
||||||
let mut targets = HashSet::new();
|
|
||||||
for (i, val) in block.iter().cloned().enumerate() {
|
|
||||||
if i > 0 {
|
|
||||||
targets.insert(val - 1);
|
|
||||||
}
|
|
||||||
targets.insert(val);
|
|
||||||
}
|
|
||||||
for target in targets {
|
|
||||||
util_test_search_within_block(block, target);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_search_within_block() {
|
|
||||||
for len in 1u32..128u32 {
|
|
||||||
let v: Vec<u32> = (0..len).map(|i| i * 2).collect();
|
|
||||||
util_test_search_within_block_all(&v[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_block_segment_postings() {
|
fn test_block_segment_postings() {
|
||||||
let mut block_segments = build_block_postings(&(0..100_000).collect::<Vec<u32>>());
|
let mut block_segments = build_block_postings(&(0..100_000).collect::<Vec<u32>>());
|
||||||
|
|||||||
@@ -14,41 +14,35 @@ use Score;
|
|||||||
/// specialized implementation if the two
|
/// specialized implementation if the two
|
||||||
/// shortest scorers are `TermScorer`s.
|
/// shortest scorers are `TermScorer`s.
|
||||||
pub fn intersect_scorers(mut scorers: Vec<Box<Scorer>>) -> Box<Scorer> {
|
pub fn intersect_scorers(mut scorers: Vec<Box<Scorer>>) -> Box<Scorer> {
|
||||||
|
if scorers.is_empty() {
|
||||||
|
return Box::new(EmptyScorer);
|
||||||
|
}
|
||||||
|
if scorers.len() == 1 {
|
||||||
|
return scorers.pop().unwrap();
|
||||||
|
}
|
||||||
|
// We know that we have at least 2 elements.
|
||||||
let num_docsets = scorers.len();
|
let num_docsets = scorers.len();
|
||||||
scorers.sort_by(|left, right| right.size_hint().cmp(&left.size_hint()));
|
scorers.sort_by(|left, right| right.size_hint().cmp(&left.size_hint()));
|
||||||
let rarest_opt = scorers.pop();
|
let left = scorers.pop().unwrap();
|
||||||
let second_rarest_opt = scorers.pop();
|
let right = scorers.pop().unwrap();
|
||||||
scorers.reverse();
|
scorers.reverse();
|
||||||
match (rarest_opt, second_rarest_opt) {
|
let all_term_scorers = [&left, &right]
|
||||||
(None, None) => Box::new(EmptyScorer),
|
.iter()
|
||||||
(Some(single_docset), None) => single_docset,
|
.all(|&scorer| scorer.is::<TermScorer>());
|
||||||
(Some(left), Some(right)) => {
|
if all_term_scorers {
|
||||||
{
|
return Box::new(Intersection {
|
||||||
let all_term_scorers = [&left, &right]
|
left: *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||||
.iter()
|
right: *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||||
.all(|&scorer| scorer.is::<TermScorer>());
|
others: scorers,
|
||||||
if all_term_scorers {
|
num_docsets,
|
||||||
let left = *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap());
|
});
|
||||||
let right = *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap());
|
|
||||||
return Box::new(Intersection {
|
|
||||||
left,
|
|
||||||
right,
|
|
||||||
others: scorers,
|
|
||||||
num_docsets,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Box::new(Intersection {
|
|
||||||
left,
|
|
||||||
right,
|
|
||||||
others: scorers,
|
|
||||||
num_docsets,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
unreachable!();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Box::new(Intersection {
|
||||||
|
left,
|
||||||
|
right,
|
||||||
|
others: scorers,
|
||||||
|
num_docsets,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a `DocSet` that iterator through the intersection of two `DocSet`s.
|
/// Creates a `DocSet` that iterator through the intersection of two `DocSet`s.
|
||||||
@@ -124,7 +118,6 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match left.skip_next(candidate) {
|
match left.skip_next(candidate) {
|
||||||
SkipResult::Reached => {
|
SkipResult::Reached => {
|
||||||
break;
|
break;
|
||||||
@@ -140,35 +133,36 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
|||||||
}
|
}
|
||||||
// test the remaining scorers;
|
// test the remaining scorers;
|
||||||
for (ord, docset) in self.others.iter_mut().enumerate() {
|
for (ord, docset) in self.others.iter_mut().enumerate() {
|
||||||
if ord != other_candidate_ord {
|
if ord == other_candidate_ord {
|
||||||
// `candidate_ord` is already at the
|
continue;
|
||||||
// right position.
|
}
|
||||||
//
|
// `candidate_ord` is already at the
|
||||||
// Calling `skip_next` would advance this docset
|
// right position.
|
||||||
// and miss it.
|
//
|
||||||
match docset.skip_next(candidate) {
|
// Calling `skip_next` would advance this docset
|
||||||
SkipResult::Reached => {}
|
// and miss it.
|
||||||
SkipResult::OverStep => {
|
match docset.skip_next(candidate) {
|
||||||
// this is not in the intersection,
|
SkipResult::Reached => {}
|
||||||
// let's update our candidate.
|
SkipResult::OverStep => {
|
||||||
candidate = docset.doc();
|
// this is not in the intersection,
|
||||||
match left.skip_next(candidate) {
|
// let's update our candidate.
|
||||||
SkipResult::Reached => {
|
candidate = docset.doc();
|
||||||
other_candidate_ord = ord;
|
match left.skip_next(candidate) {
|
||||||
}
|
SkipResult::Reached => {
|
||||||
SkipResult::OverStep => {
|
other_candidate_ord = ord;
|
||||||
candidate = left.doc();
|
}
|
||||||
other_candidate_ord = usize::max_value();
|
SkipResult::OverStep => {
|
||||||
}
|
candidate = left.doc();
|
||||||
SkipResult::End => {
|
other_candidate_ord = usize::max_value();
|
||||||
return false;
|
}
|
||||||
}
|
SkipResult::End => {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
SkipResult::End => {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
SkipResult::End => {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use Index;
|
|||||||
use Result;
|
use Result;
|
||||||
use Searcher;
|
use Searcher;
|
||||||
use SegmentReader;
|
use SegmentReader;
|
||||||
|
use schema::Schema;
|
||||||
|
|
||||||
/// Defines when a new version of the index should be reloaded.
|
/// Defines when a new version of the index should be reloaded.
|
||||||
///
|
///
|
||||||
@@ -158,6 +159,11 @@ pub struct IndexReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl IndexReader {
|
impl IndexReader {
|
||||||
|
|
||||||
|
pub fn schema(&self) -> Schema {
|
||||||
|
self.inner.index.schema()
|
||||||
|
}
|
||||||
|
|
||||||
/// Update searchers so that they reflect the state of the last
|
/// Update searchers so that they reflect the state of the last
|
||||||
/// `.commit()`.
|
/// `.commit()`.
|
||||||
///
|
///
|
||||||
|
|||||||
Reference in New Issue
Block a user