mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-30 14:02:55 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f07dc35d8 |
15
CHANGELOG.md
15
CHANGELOG.md
@@ -1,18 +1,3 @@
|
||||
Tantivy 0.9.0
|
||||
=====================
|
||||
- Removed most unsafe (@fulmicoton)
|
||||
- Indexer memory footprint improved. (VInt comp, inlining the first block. (@fulmicoton)
|
||||
- Stemming in other language possible (@pentlander)
|
||||
- Segments with no docs are deleted earlier (@barrotsteindev)
|
||||
|
||||
Tantivy 0.8.1
|
||||
=====================
|
||||
Hotfix of #476.
|
||||
|
||||
Merge was reflecting deletes before commit was passed.
|
||||
Thanks @barrotsteindev for reporting the bug.
|
||||
|
||||
|
||||
Tantivy 0.8.0
|
||||
=====================
|
||||
*No change in the index format*
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.9.0-dev"
|
||||
version = "0.8.2"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -29,7 +29,6 @@ serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
num_cpus = "1.2"
|
||||
fs2={version="0.4", optional=true}
|
||||
itertools = "0.8"
|
||||
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
|
||||
bit-set = "0.5"
|
||||
@@ -40,10 +39,10 @@ futures-cpupool = "0.1"
|
||||
owning_ref = "0.4"
|
||||
stable_deref_trait = "1.0.0"
|
||||
rust-stemmers = "1"
|
||||
downcast-rs = { version="1.0" }
|
||||
downcast = { version="0.9" }
|
||||
matches = "0.1"
|
||||
bitpacking = "0.5"
|
||||
census = "0.2"
|
||||
census = "0.1"
|
||||
fnv = "1.0.6"
|
||||
owned-read = "0.4"
|
||||
failure = "0.1"
|
||||
@@ -71,7 +70,7 @@ overflow-checks = true
|
||||
[features]
|
||||
# by default no-fail is disabled. We manually enable it when running test.
|
||||
default = ["mmap", "no_fail"]
|
||||
mmap = ["fst/mmap", "atomicwrites", "fs2"]
|
||||
mmap = ["fst/mmap", "atomicwrites"]
|
||||
lz4-compression = ["lz4"]
|
||||
no_fail = ["fail/no_fail"]
|
||||
unstable = [] # useful for benches.
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
**Tantivy** is a **full text search engine library** written in rust.
|
||||
|
||||
It is closer to [Apache Lucene](https://lucene.apache.org/) than to [Elasticsearch](https://www.elastic.co/products/elasticsearch) and [Apache Solr](https://lucene.apache.org/solr/) in the sense it is not
|
||||
It is closer to [Apache Lucene](https://lucene.apache.org/) than to [Elastic Search](https://www.elastic.co/products/elasticsearch) and [Apache Solr](https://lucene.apache.org/solr/) in the sense it is not
|
||||
an off-the-shelf search engine server, but rather a crate that can be used
|
||||
to build such a search engine.
|
||||
|
||||
@@ -76,7 +76,7 @@ It will walk you through getting a wikipedia search engine up and running in a f
|
||||
Tantivy compiles on stable rust but requires `Rust >= 1.27`.
|
||||
To check out and run tests, you can simply run :
|
||||
|
||||
git clone https://github.com/tantivy-search/tantivy.git
|
||||
git clone git@github.com:tantivy-search/tantivy.git
|
||||
cd tantivy
|
||||
cargo build
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
for i in 0u64..10u64 {
|
||||
index_writer.add_document(doc!(
|
||||
|
||||
@@ -85,7 +85,7 @@ See the `custom_collector` example.
|
||||
|
||||
*/
|
||||
|
||||
use downcast_rs;
|
||||
use downcast;
|
||||
use DocId;
|
||||
use Result;
|
||||
use Score;
|
||||
@@ -111,9 +111,9 @@ pub use self::facet_collector::FacetCollector;
|
||||
|
||||
/// `Fruit` is the type for the result of our collection.
|
||||
/// e.g. `usize` for the `Count` collector.
|
||||
pub trait Fruit: Send + downcast_rs::Downcast {}
|
||||
pub trait Fruit: Send + downcast::Any {}
|
||||
|
||||
impl<T> Fruit for T where T: Send + downcast_rs::Downcast {}
|
||||
impl<T> Fruit for T where T: Send + downcast::Any {}
|
||||
|
||||
/// Collectors are in charge of collecting and retaining relevant
|
||||
/// information from the document found and scored by the query.
|
||||
@@ -358,7 +358,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl_downcast!(Fruit);
|
||||
#[allow(missing_docs)]
|
||||
mod downcast_impl {
|
||||
downcast!(super::Fruit);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::Collector;
|
||||
use super::SegmentCollector;
|
||||
use collector::Fruit;
|
||||
use downcast::Downcast;
|
||||
use std::marker::PhantomData;
|
||||
use DocId;
|
||||
use Result;
|
||||
@@ -36,10 +37,11 @@ impl<TCollector: Collector> Collector for CollectorWrapper<TCollector> {
|
||||
let typed_fruit: Vec<TCollector::Fruit> = children
|
||||
.into_iter()
|
||||
.map(|untyped_fruit| {
|
||||
untyped_fruit.downcast::<TCollector::Fruit>()
|
||||
Downcast::<TCollector::Fruit>::downcast(untyped_fruit)
|
||||
.map(|boxed_but_typed| *boxed_but_typed)
|
||||
.map_err(|_| {
|
||||
TantivyError::InvalidArgument("Failed to cast child fruit.".to_string())
|
||||
.map_err(|e| {
|
||||
let err_msg = format!("Failed to cast child collector fruit. {:?}", e);
|
||||
TantivyError::InvalidArgument(err_msg)
|
||||
})
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
@@ -87,17 +89,14 @@ pub struct FruitHandle<TFruit: Fruit> {
|
||||
impl<TFruit: Fruit> FruitHandle<TFruit> {
|
||||
pub fn extract(self, fruits: &mut MultiFruit) -> TFruit {
|
||||
let boxed_fruit = fruits.sub_fruits[self.pos].take().expect("");
|
||||
*boxed_fruit.downcast::<TFruit>().map_err(|_| ()).expect("Failed to downcast collector fruit.")
|
||||
*Downcast::<TFruit>::downcast(boxed_fruit).expect("Failed")
|
||||
}
|
||||
}
|
||||
|
||||
/// Multicollector makes it possible to collect on more than one collector.
|
||||
/// It should only be used for use cases where the Collector types is unknown
|
||||
/// at compile time.
|
||||
///
|
||||
/// If the type of the collectors is known, you can just group yours collectors
|
||||
/// in a tuple. See the
|
||||
/// [Combining several collectors section of the collector documentation](./index.html#combining-several-collectors).
|
||||
/// If the type of the collectors is known, you should prefer to use `ChainedCollector`.
|
||||
///
|
||||
/// ```rust
|
||||
/// #[macro_use]
|
||||
|
||||
@@ -142,7 +142,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"Hello happy tax payer."));
|
||||
index_writer.add_document(doc!(text_field=>"Droopy says hello happy tax payer"));
|
||||
index_writer.add_document(doc!(text_field=>"I like Droopy"));
|
||||
|
||||
@@ -104,6 +104,31 @@ where
|
||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||
val_shifted & mask
|
||||
}
|
||||
|
||||
/// Reads a range of values from the fast field.
|
||||
///
|
||||
/// The range of values read is from
|
||||
/// `[start..start + output.len()[`
|
||||
pub fn get_range(&self, start: u32, output: &mut [u64]) {
|
||||
if self.num_bits == 0 {
|
||||
for val in output.iter_mut() {
|
||||
*val = 0u64;
|
||||
}
|
||||
} else {
|
||||
let data: &[u8] = &*self.data;
|
||||
let num_bits = self.num_bits;
|
||||
let mask = self.mask;
|
||||
let mut addr_in_bits = (start as usize) * num_bits;
|
||||
for output_val in output.iter_mut() {
|
||||
let addr = addr_in_bits >> 3;
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
let val_unshifted_unmasked: u64 = LittleEndian::read_u64(&data[addr..]);
|
||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||
*output_val = val_shifted & mask;
|
||||
addr_in_bits += num_bits;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -141,4 +166,17 @@ mod test {
|
||||
test_bitpacker_util(6, 14);
|
||||
test_bitpacker_util(1000, 14);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bitpacker_range() {
|
||||
let (bitunpacker, vals) = create_fastfield_bitpacker(100_000, 12);
|
||||
let buffer_len = 100;
|
||||
let mut buffer = vec![0u64; buffer_len];
|
||||
for start in vec![0, 10, 20, 100, 1_000] {
|
||||
bitunpacker.get_range(start as u32, &mut buffer[..]);
|
||||
for i in 0..buffer_len {
|
||||
assert_eq!(buffer[i], vals[start + i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ pub(crate) use self::bitset::TinySet;
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::counting_writer::CountingWriter;
|
||||
pub use self::serialize::{BinarySerializable, FixedSize};
|
||||
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
|
||||
pub use self::vint::VInt;
|
||||
pub use byteorder::LittleEndian as Endianness;
|
||||
|
||||
use std::io;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use super::BinarySerializable;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
@@ -10,100 +9,6 @@ pub struct VInt(pub u64);
|
||||
|
||||
const STOP_BIT: u8 = 128;
|
||||
|
||||
pub fn serialize_vint_u32(val: u32) -> (u64, usize) {
|
||||
const START_2: u64 = 1 << 7;
|
||||
const START_3: u64 = 1 << 14;
|
||||
const START_4: u64 = 1 << 21;
|
||||
const START_5: u64 = 1 << 28;
|
||||
|
||||
const STOP_1: u64 = START_2 - 1;
|
||||
const STOP_2: u64 = START_3 - 1;
|
||||
const STOP_3: u64 = START_4 - 1;
|
||||
const STOP_4: u64 = START_5 - 1;
|
||||
|
||||
const MASK_1: u64 = 127;
|
||||
const MASK_2: u64 = MASK_1 << 7;
|
||||
const MASK_3: u64 = MASK_2 << 7;
|
||||
const MASK_4: u64 = MASK_3 << 7;
|
||||
const MASK_5: u64 = MASK_4 << 7;
|
||||
|
||||
let val = u64::from(val);
|
||||
const STOP_BIT: u64 = 128u64;
|
||||
match val {
|
||||
0...STOP_1 => (val | STOP_BIT, 1),
|
||||
START_2...STOP_2 => (
|
||||
(val & MASK_1) | ((val & MASK_2) << 1) | (STOP_BIT << (8)),
|
||||
2,
|
||||
),
|
||||
START_3...STOP_3 => (
|
||||
(val & MASK_1) | ((val & MASK_2) << 1) | ((val & MASK_3) << 2) | (STOP_BIT << (8 * 2)),
|
||||
3,
|
||||
),
|
||||
START_4...STOP_4 => (
|
||||
(val & MASK_1)
|
||||
| ((val & MASK_2) << 1)
|
||||
| ((val & MASK_3) << 2)
|
||||
| ((val & MASK_4) << 3)
|
||||
| (STOP_BIT << (8 * 3)),
|
||||
4,
|
||||
),
|
||||
_ => (
|
||||
(val & MASK_1)
|
||||
| ((val & MASK_2) << 1)
|
||||
| ((val & MASK_3) << 2)
|
||||
| ((val & MASK_4) << 3)
|
||||
| ((val & MASK_5) << 4)
|
||||
| (STOP_BIT << (8 * 4)),
|
||||
5,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes covered by a
|
||||
/// serialized vint `u32`.
|
||||
///
|
||||
/// Expects a buffer data that starts
|
||||
/// by the serialized `vint`, scans at most 5 bytes ahead until
|
||||
/// it finds the vint final byte.
|
||||
///
|
||||
/// # May Panic
|
||||
/// If the payload does not start by a valid `vint`
|
||||
fn vint_len(data: &[u8]) -> usize {
|
||||
for (i, &val) in data.iter().enumerate().take(5) {
|
||||
if val >= STOP_BIT {
|
||||
return i + 1;
|
||||
}
|
||||
}
|
||||
panic!("Corrupted data. Invalid VInt 32");
|
||||
}
|
||||
|
||||
/// Reads a vint `u32` from a buffer, and
|
||||
/// consumes its payload data.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the buffer does not start by a valid
|
||||
/// vint payload
|
||||
pub fn read_u32_vint(data: &mut &[u8]) -> u32 {
|
||||
let vlen = vint_len(*data);
|
||||
let mut result = 0u32;
|
||||
let mut shift = 0u64;
|
||||
for &b in &data[..vlen] {
|
||||
result |= u32::from(b & 127u8) << shift;
|
||||
shift += 7;
|
||||
}
|
||||
*data = &data[vlen..];
|
||||
result
|
||||
}
|
||||
|
||||
/// Write a `u32` as a vint payload.
|
||||
pub fn write_u32_vint<W: io::Write>(val: u32, writer: &mut W) -> io::Result<()> {
|
||||
let (val, num_bytes) = serialize_vint_u32(val);
|
||||
let mut buffer = [0u8; 8];
|
||||
LittleEndian::write_u64(&mut buffer, val);
|
||||
writer.write_all(&buffer[..num_bytes])
|
||||
}
|
||||
|
||||
impl VInt {
|
||||
pub fn val(&self) -> u64 {
|
||||
self.0
|
||||
@@ -119,7 +24,7 @@ impl VInt {
|
||||
output.extend(&buffer[0..num_bytes]);
|
||||
}
|
||||
|
||||
pub fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
|
||||
fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
|
||||
let mut remaining = self.0;
|
||||
for (i, b) in buffer.iter_mut().enumerate() {
|
||||
let next_byte: u8 = (remaining % 128u64) as u8;
|
||||
@@ -159,7 +64,7 @@ impl BinarySerializable for VInt {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Reach end of buffer while reading VInt",
|
||||
));
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,9 +74,7 @@ impl BinarySerializable for VInt {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::serialize_vint_u32;
|
||||
use super::VInt;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use common::BinarySerializable;
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
@@ -205,28 +108,4 @@ mod tests {
|
||||
}
|
||||
aux_test_vint(10);
|
||||
}
|
||||
|
||||
fn aux_test_serialize_vint_u32(val: u32) {
|
||||
let mut buffer = [0u8; 10];
|
||||
let mut buffer2 = [0u8; 10];
|
||||
let len_vint = VInt(val as u64).serialize_into(&mut buffer);
|
||||
let (vint, len) = serialize_vint_u32(val);
|
||||
assert_eq!(len, len_vint, "len wrong for val {}", val);
|
||||
LittleEndian::write_u64(&mut buffer2, vint);
|
||||
assert_eq!(&buffer[..len], &buffer2[..len], "array wrong for {}", val);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vint_u32() {
|
||||
aux_test_serialize_vint_u32(0);
|
||||
aux_test_serialize_vint_u32(1);
|
||||
aux_test_serialize_vint_u32(5);
|
||||
for i in 1..3 {
|
||||
let power_of_128 = 1u32 << (7 * i);
|
||||
aux_test_serialize_vint_u32(power_of_128 - 1u32);
|
||||
aux_test_serialize_vint_u32(power_of_128);
|
||||
aux_test_serialize_vint_u32(power_of_128 + 1u32);
|
||||
}
|
||||
aux_test_serialize_vint_u32(u32::max_value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Executor {
|
||||
// This is important as it makes it possible for the fruit_receiver iteration to
|
||||
// terminate.
|
||||
};
|
||||
// This is lame, but safe.
|
||||
// This is lame, but it does not use unsafe code.
|
||||
let mut results_with_position = Vec::with_capacity(num_fruits);
|
||||
for (pos, fruit_res) in fruit_receiver {
|
||||
let fruit = fruit_res?;
|
||||
|
||||
@@ -12,14 +12,13 @@ use core::META_FILEPATH;
|
||||
use directory::ManagedDirectory;
|
||||
#[cfg(feature = "mmap")]
|
||||
use directory::MmapDirectory;
|
||||
use directory::INDEX_WRITER_LOCK;
|
||||
use directory::META_LOCK;
|
||||
use directory::{Directory, RAMDirectory};
|
||||
use error::DataCorruption;
|
||||
use error::TantivyError;
|
||||
use indexer::index_writer::open_index_writer;
|
||||
use indexer::index_writer::HEAP_SIZE_MIN;
|
||||
use indexer::segment_updater::save_new_metas;
|
||||
use indexer::LockType;
|
||||
use num_cpus;
|
||||
use schema::Field;
|
||||
use schema::FieldType;
|
||||
@@ -151,7 +150,7 @@ impl Index {
|
||||
///
|
||||
/// This will overwrite existing meta.json
|
||||
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
|
||||
save_new_metas(schema.clone(), directory.borrow_mut())?;
|
||||
save_new_metas(schema.clone(), directory.borrow_mut())?;
|
||||
let metas = IndexMeta::with_schema(schema);
|
||||
Index::create_from_metas(directory, &metas)
|
||||
}
|
||||
@@ -233,8 +232,7 @@ impl Index {
|
||||
/// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`.
|
||||
///
|
||||
/// # Errors
|
||||
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IOError`.
|
||||
///
|
||||
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
|
||||
/// # Panics
|
||||
/// If the heap size per thread is too small, panics.
|
||||
pub fn writer_with_num_threads(
|
||||
@@ -242,21 +240,7 @@ impl Index {
|
||||
num_threads: usize,
|
||||
overall_heap_size_in_bytes: usize,
|
||||
) -> Result<IndexWriter> {
|
||||
let directory_lock = self
|
||||
.directory
|
||||
.acquire_lock(&INDEX_WRITER_LOCK)
|
||||
.map_err(|err| {
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using\
|
||||
a regular directory, this means there is already an \
|
||||
`IndexWriter` working on this `Directory`, in this process \
|
||||
or in a different process."
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let directory_lock = LockType::IndexWriterLock.acquire_lock(&self.directory)?;
|
||||
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
|
||||
open_index_writer(
|
||||
self,
|
||||
@@ -355,7 +339,7 @@ impl Index {
|
||||
/// get the freshest `index` at all time, is to watch `meta.json` and
|
||||
/// call `load_searchers` whenever a changes happen.
|
||||
pub fn load_searchers(&self) -> Result<()> {
|
||||
let _meta_lock = self.directory().acquire_lock(&META_LOCK)?;
|
||||
let _meta_lock = LockType::MetaLock.acquire_lock(self.directory())?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
let segment_readers: Vec<SegmentReader> = searchable_segments
|
||||
.iter()
|
||||
|
||||
@@ -41,6 +41,6 @@ impl SegmentComponent {
|
||||
SegmentComponent::STORE,
|
||||
SegmentComponent::DELETE,
|
||||
];
|
||||
SEGMENT_COMPONENTS.iter()
|
||||
SEGMENT_COMPONENTS.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,102 +1,11 @@
|
||||
use directory::directory_lock::Lock;
|
||||
use directory::error::LockError;
|
||||
use directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::marker::Send;
|
||||
use std::marker::Sync;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::result;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Retry the logic of acquiring locks is pretty simple.
|
||||
/// We just retry `n` times after a given `duratio`, both
|
||||
/// depending on the type of lock.
|
||||
struct RetryPolicy {
|
||||
num_retries: usize,
|
||||
wait_in_ms: u64,
|
||||
}
|
||||
|
||||
impl RetryPolicy {
|
||||
fn no_retry() -> RetryPolicy {
|
||||
RetryPolicy {
|
||||
num_retries: 0,
|
||||
wait_in_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_and_retry(&mut self) -> bool {
|
||||
if self.num_retries == 0 {
|
||||
false
|
||||
} else {
|
||||
self.num_retries -= 1;
|
||||
let wait_duration = Duration::from_millis(self.wait_in_ms);
|
||||
thread::sleep(wait_duration);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `DirectoryLock` is an object that represents a file lock.
|
||||
/// See [`LockType`](struct.LockType.html)
|
||||
///
|
||||
/// It is transparently associated to a lock file, that gets deleted
|
||||
/// on `Drop.` The lock is released automatically on `Drop`.
|
||||
pub struct DirectoryLock(Box<Drop + Send + 'static>);
|
||||
|
||||
struct DirectoryLockGuard {
|
||||
directory: Box<Directory>,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
|
||||
fn from(underlying: Box<T>) -> Self {
|
||||
DirectoryLock(underlying)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DirectoryLockGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.directory.delete(&*self.path) {
|
||||
error!("Failed to remove the lock file. {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TryAcquireLockError {
|
||||
FileExists,
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
fn try_acquire_lock(
|
||||
filepath: &Path,
|
||||
directory: &mut Directory,
|
||||
) -> Result<DirectoryLock, TryAcquireLockError> {
|
||||
let mut write = directory.open_write(filepath).map_err(|e| match e {
|
||||
OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists,
|
||||
OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()),
|
||||
})?;
|
||||
write.flush().map_err(TryAcquireLockError::IOError)?;
|
||||
Ok(DirectoryLock::from(Box::new(DirectoryLockGuard {
|
||||
directory: directory.box_clone(),
|
||||
path: filepath.to_owned(),
|
||||
})))
|
||||
}
|
||||
|
||||
fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
if is_blocking {
|
||||
RetryPolicy {
|
||||
num_retries: 100,
|
||||
wait_in_ms: 100,
|
||||
}
|
||||
} else {
|
||||
RetryPolicy::no_retry()
|
||||
}
|
||||
}
|
||||
|
||||
/// Write-once read many (WORM) abstraction for where
|
||||
/// tantivy's data should be stored.
|
||||
@@ -164,29 +73,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
///
|
||||
/// The file may or may not previously exist.
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>;
|
||||
|
||||
/// Acquire a lock in the given directory.
|
||||
///
|
||||
/// The method is blocking or not depending on the `Lock` object.
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
let mut box_directory = self.box_clone();
|
||||
let mut retry_policy = retry_policy(lock.is_blocking);
|
||||
loop {
|
||||
match try_acquire_lock(&lock.filepath, &mut *box_directory) {
|
||||
Ok(result) => {
|
||||
return Ok(result);
|
||||
}
|
||||
Err(TryAcquireLockError::FileExists) => {
|
||||
if !retry_policy.wait_and_retry() {
|
||||
return Err(LockError::LockBusy);
|
||||
}
|
||||
}
|
||||
Err(TryAcquireLockError::IOError(io_error)) => {
|
||||
return Err(LockError::IOError(io_error));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DirectoryClone
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// A directory lock.
|
||||
///
|
||||
/// A lock is associated to a specific path and some
|
||||
/// [`LockParams`](./enum.LockParams.html).
|
||||
/// Tantivy itself uses only two locks but client application
|
||||
/// can use the directory facility to define their own locks.
|
||||
/// - [INDEX_WRITER_LOCK](./struct.INDEX_WRITER_LOCK.html)
|
||||
/// - [META_LOCK](./struct.META_LOCK.html)
|
||||
///
|
||||
/// Check out these locks documentation for more information.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct Lock {
|
||||
/// The lock needs to be associated with its own file `path`.
|
||||
/// Depending on the platform, the lock might rely on the creation
|
||||
/// and deletion of this filepath.
|
||||
pub filepath: PathBuf,
|
||||
/// `lock_params` describes whether acquiring the lock is meant
|
||||
/// to be a blocking operation or a non-blocking.
|
||||
///
|
||||
/// Acquiring a blocking lock blocks until the lock is
|
||||
/// available.
|
||||
/// Acquiring a blocking lock returns rapidly, either successfully
|
||||
/// or with an error signifying that someone is already holding
|
||||
/// the lock.
|
||||
pub is_blocking: bool,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
/// Only one process should be able to write tantivy's index at a time.
|
||||
/// This lock file, when present, is in charge of preventing other processes to open an IndexWriter.
|
||||
///
|
||||
/// If the process is killed and this file remains, it is safe to remove it manually.
|
||||
///
|
||||
/// Failing to acquire this lock usually means a misuse of tantivy's API,
|
||||
/// (creating more than one instance of the `IndexWriter`), are a spurious
|
||||
/// lock file remaining after a crash. In the latter case, removing the file after
|
||||
/// checking no process running tantivy is running is safe.
|
||||
pub static ref INDEX_WRITER_LOCK: Lock = Lock {
|
||||
filepath: PathBuf::from(".tantivy-writer.lock"),
|
||||
is_blocking: false
|
||||
};
|
||||
/// The meta lock file is here to protect the segment files being opened by
|
||||
/// `.load_searchers()` from being garbage collected.
|
||||
/// It makes it possible for another process to safely consume
|
||||
/// our index in-writing. Ideally, we may have prefered `RWLock` semantics
|
||||
/// here, but it is difficult to achieve on Windows.
|
||||
///
|
||||
/// Opening segment readers is a very fast process.
|
||||
pub static ref META_LOCK: Lock = Lock {
|
||||
filepath: PathBuf::from(".tantivy-meta.lock"),
|
||||
is_blocking: true
|
||||
};
|
||||
}
|
||||
@@ -3,22 +3,6 @@ use std::fmt;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Error while trying to acquire a directory lock.
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum LockError {
|
||||
/// Failed to acquired a lock as it is already hold by another
|
||||
/// client.
|
||||
/// - In the context of a blocking lock, this means the lock was not released within some `timeout` period.
|
||||
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call.
|
||||
#[fail(
|
||||
display = "Could not acquire lock as it is already held, possibly by a different process."
|
||||
)]
|
||||
LockBusy,
|
||||
/// Trying to acquire a lock failed with an `IOError`
|
||||
#[fail(display = "Failed to acquire the lock due to an io:Error.")]
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
/// General IO error with an optional path to the offending file.
|
||||
#[derive(Debug)]
|
||||
pub struct IOError {
|
||||
@@ -26,12 +10,6 @@ pub struct IOError {
|
||||
err: io::Error,
|
||||
}
|
||||
|
||||
impl Into<io::Error> for IOError {
|
||||
fn into(self) -> io::Error {
|
||||
self.err
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IOError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self.path {
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use core::MANAGED_FILEPATH;
|
||||
use directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
|
||||
use directory::DirectoryLock;
|
||||
use directory::Lock;
|
||||
use directory::META_LOCK;
|
||||
use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use error::DataCorruption;
|
||||
use indexer::LockType;
|
||||
use serde_json;
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
@@ -94,9 +92,6 @@ impl ManagedDirectory {
|
||||
///
|
||||
/// * `living_files` - List of files that are still used by the index.
|
||||
///
|
||||
/// The use a callback ensures that the list of living_files is computed
|
||||
/// while we hold the lock on meta.
|
||||
///
|
||||
/// This method does not panick nor returns errors.
|
||||
/// If a file cannot be deleted (for permission reasons for instance)
|
||||
/// an error is simply logged, and the file remains in the list of managed
|
||||
@@ -127,7 +122,7 @@ impl ManagedDirectory {
|
||||
// 2) writer change meta.json (for instance after a merge or a commit)
|
||||
// 3) gc kicks in.
|
||||
// 4) gc removes a file that was useful for process B, before process B opened it.
|
||||
if let Ok(_meta_lock) = self.acquire_lock(&META_LOCK) {
|
||||
if let Ok(_meta_lock) = LockType::MetaLock.acquire_lock(self) {
|
||||
let living_files = get_living_files();
|
||||
for managed_path in &meta_informations_rlock.managed_paths {
|
||||
if !living_files.contains(managed_path) {
|
||||
@@ -237,10 +232,6 @@ impl Directory for ManagedDirectory {
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.directory.exists(path)
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
|
||||
self.directory.acquire_lock(lock)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ManagedDirectory {
|
||||
|
||||
@@ -1,14 +1,8 @@
|
||||
extern crate fs2;
|
||||
|
||||
use self::fs2::FileExt;
|
||||
use atomicwrites;
|
||||
use common::make_io_err;
|
||||
use directory::error::LockError;
|
||||
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use directory::shared_vec_slice::SharedVecSlice;
|
||||
use directory::Directory;
|
||||
use directory::DirectoryLock;
|
||||
use directory::Lock;
|
||||
use directory::ReadOnlySource;
|
||||
use directory::WritePtr;
|
||||
use fst::raw::MmapReadOnly;
|
||||
@@ -121,14 +115,6 @@ impl MmapCache {
|
||||
///
|
||||
/// The Mmap object are cached to limit the
|
||||
/// system calls.
|
||||
///
|
||||
/// In the `MmapDirectory`, locks are implemented using the `fs2` crate definition of locks.
|
||||
///
|
||||
/// On MacOS & linux, it relies on `flock` (aka `BSD Lock`). These locks solve most of the
|
||||
/// problems related to POSIX Locks, but may their contract may not be respected on `NFS`
|
||||
/// depending on the implementation.
|
||||
///
|
||||
/// On Windows the semantics are again different.
|
||||
#[derive(Clone)]
|
||||
pub struct MmapDirectory {
|
||||
root_path: PathBuf,
|
||||
@@ -227,21 +213,6 @@ impl MmapDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
/// We rely on fs2 for file locking. On Windows & MacOS this
|
||||
/// uses BSD locks (`flock`). The lock is actually released when
|
||||
/// the `File` object is dropped and its associated file descriptor
|
||||
/// is closed.
|
||||
struct ReleaseLockFile {
|
||||
_file: File,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Drop for ReleaseLockFile {
|
||||
fn drop(&mut self) {
|
||||
debug!("Releasing lock {:?}", self.path);
|
||||
}
|
||||
}
|
||||
|
||||
/// This Write wraps a File, but has the specificity of
|
||||
/// call `sync_all` on flush.
|
||||
struct SafeFileWriter(File);
|
||||
@@ -383,26 +354,6 @@ impl Directory for MmapDirectory {
|
||||
meta_file.write(|f| f.write_all(data))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
let full_path = self.resolve_path(&lock.filepath);
|
||||
// We make sure that the file exists.
|
||||
let file: File = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true) //< if the file does not exist yet, create it.
|
||||
.open(&full_path)
|
||||
.map_err(LockError::IOError)?;
|
||||
if lock.is_blocking {
|
||||
file.lock_exclusive().map_err(LockError::IOError)?;
|
||||
} else {
|
||||
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
|
||||
}
|
||||
// dropping the file handle will release the lock.
|
||||
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
|
||||
path: lock.filepath.clone(),
|
||||
_file: file,
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -8,7 +8,6 @@ WORM directory abstraction.
|
||||
mod mmap_directory;
|
||||
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod read_only_source;
|
||||
@@ -17,12 +16,11 @@ mod shared_vec_slice;
|
||||
/// Errors specific to the directory module.
|
||||
pub mod error;
|
||||
|
||||
pub use self::directory::DirectoryLock;
|
||||
use std::io::{BufWriter, Seek, Write};
|
||||
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
use std::io::{BufWriter, Seek, Write};
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
pub use self::mmap_directory::MmapDirectory;
|
||||
@@ -40,4 +38,128 @@ impl<T: Seek + Write> SeekableWrite for T {}
|
||||
pub type WritePtr = BufWriter<Box<SeekableWrite>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::path::Path;
|
||||
|
||||
lazy_static! {
|
||||
static ref TEST_PATH: &'static Path = Path::new("some_path_for_test");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ram_directory() {
|
||||
let mut ram_directory = RAMDirectory::create();
|
||||
test_directory(&mut ram_directory);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "mmap")]
|
||||
fn test_mmap_directory() {
|
||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
test_directory(&mut mmap_directory);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn ram_directory_panics_if_flush_forgotten() {
|
||||
let mut ram_directory = RAMDirectory::create();
|
||||
let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(write_file.write_all(&[4]).is_ok());
|
||||
}
|
||||
|
||||
fn test_simple(directory: &mut Directory) {
|
||||
{
|
||||
{
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
write_file.write_all(&[4]).unwrap();
|
||||
write_file.write_all(&[3]).unwrap();
|
||||
write_file.write_all(&[7, 3, 5]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
}
|
||||
let read_file = directory.open_read(*TEST_PATH).unwrap();
|
||||
let data: &[u8] = &*read_file;
|
||||
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
assert!(!directory.exists(*TEST_PATH));
|
||||
}
|
||||
|
||||
fn test_seek(directory: &mut Directory) {
|
||||
{
|
||||
{
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
write_file.write_all(&[4, 3, 7, 3, 5]).unwrap();
|
||||
write_file.seek(SeekFrom::Start(0)).unwrap();
|
||||
write_file.write_all(&[3, 1]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
}
|
||||
let read_file = directory.open_read(*TEST_PATH).unwrap();
|
||||
let data: &[u8] = &*read_file;
|
||||
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
fn test_rewrite_forbidden(directory: &mut Directory) {
|
||||
{
|
||||
directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
}
|
||||
{
|
||||
assert!(directory.open_write(*TEST_PATH).is_err());
|
||||
}
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
fn test_write_create_the_file(directory: &mut Directory) {
|
||||
{
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
let _w = directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
assert!(directory.open_read(*TEST_PATH).is_ok());
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
fn test_directory_delete(directory: &mut Directory) {
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
write_file.write_all(&[1, 2, 3, 4]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
{
|
||||
let read_handle = directory.open_read(*TEST_PATH).unwrap();
|
||||
{
|
||||
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
|
||||
|
||||
// Mapped files can't be deleted on Windows
|
||||
if !cfg!(windows) {
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
if cfg!(windows) {
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
assert!(directory.delete(*TEST_PATH).is_err());
|
||||
}
|
||||
|
||||
fn test_directory(directory: &mut Directory) {
|
||||
test_simple(directory);
|
||||
test_seek(directory);
|
||||
test_rewrite_forbidden(directory);
|
||||
test_write_create_the_file(directory);
|
||||
test_directory_delete(directory);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,182 +0,0 @@
|
||||
use super::*;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time;
|
||||
|
||||
lazy_static! {
|
||||
static ref TEST_PATH: &'static Path = Path::new("some_path_for_test");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ram_directory() {
|
||||
let mut ram_directory = RAMDirectory::create();
|
||||
test_directory(&mut ram_directory);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "mmap")]
|
||||
fn test_mmap_directory() {
|
||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
test_directory(&mut mmap_directory);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn ram_directory_panics_if_flush_forgotten() {
|
||||
let mut ram_directory = RAMDirectory::create();
|
||||
let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(write_file.write_all(&[4]).is_ok());
|
||||
}
|
||||
|
||||
fn test_simple(directory: &mut Directory) {
|
||||
{
|
||||
{
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
write_file.write_all(&[4]).unwrap();
|
||||
write_file.write_all(&[3]).unwrap();
|
||||
write_file.write_all(&[7, 3, 5]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
}
|
||||
let read_file = directory.open_read(*TEST_PATH).unwrap();
|
||||
let data: &[u8] = &*read_file;
|
||||
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
assert!(!directory.exists(*TEST_PATH));
|
||||
}
|
||||
|
||||
fn test_seek(directory: &mut Directory) {
|
||||
{
|
||||
{
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
write_file.write_all(&[4, 3, 7, 3, 5]).unwrap();
|
||||
write_file.seek(SeekFrom::Start(0)).unwrap();
|
||||
write_file.write_all(&[3, 1]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
}
|
||||
let read_file = directory.open_read(*TEST_PATH).unwrap();
|
||||
let data: &[u8] = &*read_file;
|
||||
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
fn test_rewrite_forbidden(directory: &mut Directory) {
|
||||
{
|
||||
directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
}
|
||||
{
|
||||
assert!(directory.open_write(*TEST_PATH).is_err());
|
||||
}
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
fn test_write_create_the_file(directory: &mut Directory) {
|
||||
{
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
let _w = directory.open_write(*TEST_PATH).unwrap();
|
||||
assert!(directory.exists(*TEST_PATH));
|
||||
assert!(directory.open_read(*TEST_PATH).is_ok());
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
fn test_directory_delete(directory: &mut Directory) {
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
|
||||
write_file.write_all(&[1, 2, 3, 4]).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
{
|
||||
let read_handle = directory.open_read(*TEST_PATH).unwrap();
|
||||
{
|
||||
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
|
||||
|
||||
// Mapped files can't be deleted on Windows
|
||||
if !cfg!(windows) {
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
|
||||
}
|
||||
|
||||
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
if cfg!(windows) {
|
||||
assert!(directory.delete(*TEST_PATH).is_ok());
|
||||
}
|
||||
|
||||
assert!(directory.open_read(*TEST_PATH).is_err());
|
||||
assert!(directory.delete(*TEST_PATH).is_err());
|
||||
}
|
||||
|
||||
fn test_directory(directory: &mut Directory) {
|
||||
test_simple(directory);
|
||||
test_seek(directory);
|
||||
test_rewrite_forbidden(directory);
|
||||
test_write_create_the_file(directory);
|
||||
test_directory_delete(directory);
|
||||
test_lock_non_blocking(directory);
|
||||
test_lock_blocking(directory);
|
||||
}
|
||||
|
||||
fn test_lock_non_blocking(directory: &mut Directory) {
|
||||
{
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
let lock_b_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("b.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_b_res.is_ok());
|
||||
let lock_a_res2 = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res2.is_err());
|
||||
}
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
}
|
||||
|
||||
fn test_lock_blocking(directory: &mut Directory) {
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
std::thread::spawn(move || {
|
||||
//< lock_a_res is sent to the thread.
|
||||
std::thread::sleep(time::Duration::from_millis(10));
|
||||
// explicitely droping lock_a_res. It would have been sufficient to just force it
|
||||
// to be part of the move, but the intent seems clearer that way.
|
||||
drop(lock_a_res);
|
||||
});
|
||||
{
|
||||
// A non-blocking call should fail, as the thread is running and holding the lock.
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_err());
|
||||
}
|
||||
{
|
||||
// the blocking call should wait for at least 10ms.
|
||||
let start = time::Instant::now();
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
assert!(start.elapsed().subsec_millis() >= 10);
|
||||
}
|
||||
}
|
||||
15
src/error.rs
15
src/error.rs
@@ -2,9 +2,9 @@
|
||||
|
||||
use std::io;
|
||||
|
||||
use directory::error::LockError;
|
||||
use directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use fastfield::FastFieldNotAvailableError;
|
||||
use indexer::LockType;
|
||||
use query;
|
||||
use schema;
|
||||
use serde_json;
|
||||
@@ -57,8 +57,11 @@ pub enum TantivyError {
|
||||
#[fail(display = "Index already exists")]
|
||||
IndexAlreadyExists,
|
||||
/// Failed to acquire file lock
|
||||
#[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)]
|
||||
LockFailure(LockError, Option<String>),
|
||||
#[fail(
|
||||
display = "Failed to acquire Lockfile: {:?}. Possible causes: another IndexWriter instance or panic during previous lock drop.",
|
||||
_0
|
||||
)]
|
||||
LockFailure(LockType),
|
||||
/// IO Error.
|
||||
#[fail(display = "An IO error occurred: '{}'", _0)]
|
||||
IOError(#[cause] IOError),
|
||||
@@ -97,12 +100,6 @@ impl From<FastFieldNotAvailableError> for TantivyError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LockError> for TantivyError {
|
||||
fn from(lock_error: LockError) -> TantivyError {
|
||||
TantivyError::LockFailure(lock_error, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IOError> for TantivyError {
|
||||
fn from(io_error: IOError) -> TantivyError {
|
||||
TantivyError::IOError(io_error)
|
||||
|
||||
@@ -79,8 +79,11 @@ impl<Item: FastValue> FastFieldReader<Item> {
|
||||
// TODO change start to `u64`.
|
||||
// For multifastfield, start is an index in a second fastfield, not a `DocId`
|
||||
pub fn get_range(&self, start: u32, output: &mut [Item]) {
|
||||
for (i, out) in output.iter_mut().enumerate() {
|
||||
*out = self.get(start + i as u32);
|
||||
// ok: Item is either `u64` or `i64`
|
||||
let output_u64: &mut [u64] = unsafe { &mut *(output as *mut [Item] as *mut [u64]) };
|
||||
self.bit_unpacker.get_range(start, output_u64);
|
||||
for out in output_u64.iter_mut() {
|
||||
*out = Item::from_u64(*out + self.min_value_u64).as_u64();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
131
src/indexer/directory_lock.rs
Normal file
131
src/indexer/directory_lock.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
use directory::error::OpenWriteError;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use Directory;
|
||||
use TantivyError;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum LockType {
|
||||
/// Only one process should be able to write tantivy's index at a time.
|
||||
/// This lock file, when present, is in charge of preventing other processes to open an IndexWriter.
|
||||
///
|
||||
/// If the process is killed and this file remains, it is safe to remove it manually.
|
||||
///
|
||||
/// Failing to acquire this lock usually means a misuse of tantivy's API,
|
||||
/// (creating more than one instance of the `IndexWriter`), are a spurious
|
||||
/// lock file remaining after a crash. In the latter case, removing the file after
|
||||
/// checking no process running tantivy is running is safe.
|
||||
IndexWriterLock,
|
||||
/// The meta lock file is here to protect the segment files being opened by
|
||||
/// `.load_searchers()` from being garbage collected.
|
||||
/// It makes it possible for another process to safely consume
|
||||
/// our index in-writing. Ideally, we may have prefered `RWLock` semantics
|
||||
/// here, but it is difficult to achieve on Windows.
|
||||
///
|
||||
/// Opening segment readers is a very fast process.
|
||||
/// Right now if the lock cannot be acquire on the first attempt, the logic
|
||||
/// is very simplistic. We retry after `100ms` until we effectively
|
||||
/// acquire the lock.
|
||||
/// This lock should not have much contention in normal usage.
|
||||
MetaLock,
|
||||
}
|
||||
|
||||
/// Retry the logic of acquiring locks is pretty simple.
|
||||
/// We just retry `n` times after a given `duratio`, both
|
||||
/// depending on the type of lock.
|
||||
struct RetryPolicy {
|
||||
num_retries: usize,
|
||||
wait_in_ms: u64,
|
||||
}
|
||||
|
||||
impl RetryPolicy {
|
||||
fn no_retry() -> RetryPolicy {
|
||||
RetryPolicy {
|
||||
num_retries: 0,
|
||||
wait_in_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_and_retry(&mut self) -> bool {
|
||||
if self.num_retries == 0 {
|
||||
false
|
||||
} else {
|
||||
self.num_retries -= 1;
|
||||
let wait_duration = Duration::from_millis(self.wait_in_ms);
|
||||
thread::sleep(wait_duration);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LockType {
|
||||
fn retry_policy(self) -> RetryPolicy {
|
||||
match self {
|
||||
LockType::IndexWriterLock => RetryPolicy::no_retry(),
|
||||
LockType::MetaLock => RetryPolicy {
|
||||
num_retries: 100,
|
||||
wait_in_ms: 100,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn try_acquire_lock(self, directory: &mut Directory) -> Result<DirectoryLock, TantivyError> {
|
||||
let path = self.filename();
|
||||
let mut write = directory.open_write(path).map_err(|e| match e {
|
||||
OpenWriteError::FileAlreadyExists(_) => TantivyError::LockFailure(self),
|
||||
OpenWriteError::IOError(io_error) => TantivyError::IOError(io_error),
|
||||
})?;
|
||||
write.flush()?;
|
||||
Ok(DirectoryLock {
|
||||
directory: directory.box_clone(),
|
||||
path: path.to_owned(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Acquire a lock in the given directory.
|
||||
pub fn acquire_lock(self, directory: &Directory) -> Result<DirectoryLock, TantivyError> {
|
||||
let mut box_directory = directory.box_clone();
|
||||
let mut retry_policy = self.retry_policy();
|
||||
loop {
|
||||
let lock_result = self.try_acquire_lock(&mut *box_directory);
|
||||
match lock_result {
|
||||
Ok(result) => {
|
||||
return Ok(result);
|
||||
}
|
||||
Err(TantivyError::LockFailure(ref filepath)) => {
|
||||
if !retry_policy.wait_and_retry() {
|
||||
return Err(TantivyError::LockFailure(filepath.to_owned()));
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn filename(&self) -> &Path {
|
||||
match *self {
|
||||
LockType::MetaLock => Path::new(".tantivy-meta.lock"),
|
||||
LockType::IndexWriterLock => Path::new(".tantivy-indexer.lock"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `DirectoryLock` is an object that represents a file lock.
|
||||
/// See [`LockType`](struct.LockType.html)
|
||||
///
|
||||
/// It is transparently associated to a lock file, that gets deleted
|
||||
/// on `Drop.` The lock is release automatically on `Drop`.
|
||||
pub struct DirectoryLock {
|
||||
directory: Box<Directory>,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Drop for DirectoryLock {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.directory.delete(&*self.path) {
|
||||
error!("Failed to remove the lock file. {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,15 +9,15 @@ use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use core::SegmentReader;
|
||||
use crossbeam::channel;
|
||||
use directory::DirectoryLock;
|
||||
use docset::DocSet;
|
||||
use error::TantivyError;
|
||||
use fastfield::write_delete_bitset;
|
||||
use futures::{Canceled, Future};
|
||||
use futures::sync::oneshot::Receiver;
|
||||
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
|
||||
use indexer::operation::DeleteOperation;
|
||||
use indexer::stamper::Stamper;
|
||||
use indexer::DirectoryLock;
|
||||
use indexer::MergePolicy;
|
||||
use indexer::SegmentEntry;
|
||||
use indexer::SegmentWriter;
|
||||
@@ -26,7 +26,7 @@ use schema::Document;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::Term;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::mem::swap;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use Result;
|
||||
@@ -52,19 +52,17 @@ type DocumentReceiver = channel::Receiver<AddOperation>;
|
||||
///
|
||||
/// Returns (the heap size in bytes, the hash table size in number of bits)
|
||||
fn initial_table_size(per_thread_memory_budget: usize) -> usize {
|
||||
assert!(per_thread_memory_budget > 1_000);
|
||||
let table_size_limit: usize = per_thread_memory_budget / 3;
|
||||
if let Some(limit) = (1..)
|
||||
(1..)
|
||||
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
|
||||
.last()
|
||||
{
|
||||
limit.min(19) // we cap it at 2^19 = 512K.
|
||||
} else {
|
||||
unreachable!(
|
||||
"Per thread memory is too small: {}",
|
||||
per_thread_memory_budget
|
||||
);
|
||||
}
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Per thread memory is too small: {}",
|
||||
per_thread_memory_budget
|
||||
)
|
||||
})
|
||||
.min(19) // we cap it at 512K
|
||||
}
|
||||
|
||||
/// `IndexWriter` is the user entry-point to add document to an index.
|
||||
@@ -304,7 +302,7 @@ fn index_documents(
|
||||
|
||||
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
|
||||
|
||||
let delete_bitset_opt = if delete_cursor.get().is_some() {
|
||||
let segment_entry: SegmentEntry = if delete_cursor.get().is_some() {
|
||||
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
|
||||
let segment_reader = SegmentReader::open(segment)?;
|
||||
let mut deleted_bitset = BitSet::with_capacity(num_docs as usize);
|
||||
@@ -315,17 +313,18 @@ fn index_documents(
|
||||
&doc_to_opstamps,
|
||||
last_docstamp,
|
||||
)?;
|
||||
if may_have_deletes {
|
||||
Some(deleted_bitset)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
SegmentEntry::new(segment_meta, delete_cursor, {
|
||||
if may_have_deletes {
|
||||
Some(deleted_bitset)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// if there are no delete operation in the queue, no need
|
||||
// to even open the segment.
|
||||
None
|
||||
SegmentEntry::new(segment_meta, delete_cursor, None)
|
||||
};
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
|
||||
Ok(segment_updater.add_segment(generation, segment_entry))
|
||||
}
|
||||
|
||||
@@ -367,16 +366,13 @@ impl IndexWriter {
|
||||
.add_segment(self.generation, segment_entry);
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
/// *Experimental & Advanced API* Creates a new segment.
|
||||
/// and marks it as currently in write.
|
||||
///
|
||||
/// This method is useful only for users trying to do complex
|
||||
/// operations, like converting an index format to another.
|
||||
///
|
||||
/// It is safe to start writing file associated to the new `Segment`.
|
||||
/// These will not be garbage collected as long as an instance object of
|
||||
/// `SegmentMeta` object associated to the new `Segment` is "alive".
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
self.index.new_segment()
|
||||
self.segment_updater.new_segment()
|
||||
}
|
||||
|
||||
/// Spawns a new worker thread for indexing.
|
||||
@@ -391,7 +387,6 @@ impl IndexWriter {
|
||||
let mut delete_cursor = self.delete_queue.cursor();
|
||||
|
||||
let mem_budget = self.heap_size_in_bytes_per_thread;
|
||||
let index = self.index.clone();
|
||||
let join_handle: JoinHandle<Result<()>> = thread::Builder::new()
|
||||
.name(format!(
|
||||
"thrd-tantivy-index{}-gen{}",
|
||||
@@ -417,7 +412,7 @@ impl IndexWriter {
|
||||
// was dropped.
|
||||
return Ok(());
|
||||
}
|
||||
let segment = index.new_segment();
|
||||
let segment = segment_updater.new_segment();
|
||||
index_documents(
|
||||
mem_budget,
|
||||
&segment,
|
||||
@@ -434,7 +429,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
/// Accessor to the merge policy.
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<MergePolicy>> {
|
||||
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
|
||||
self.segment_updater.get_merge_policy()
|
||||
}
|
||||
|
||||
@@ -459,10 +454,7 @@ impl IndexWriter {
|
||||
/// Merges a given list of segments
|
||||
///
|
||||
/// `segment_ids` is required to be non-empty.
|
||||
pub fn merge(
|
||||
&mut self,
|
||||
segment_ids: &[SegmentId],
|
||||
) -> Result<impl Future<Item = SegmentMeta, Error = Canceled>> {
|
||||
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||
self.segment_updater.start_merge(segment_ids)
|
||||
}
|
||||
|
||||
@@ -475,10 +467,11 @@ impl IndexWriter {
|
||||
///
|
||||
/// Returns the former segment_ready channel.
|
||||
fn recreate_document_channel(&mut self) -> DocumentReceiver {
|
||||
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
|
||||
let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) =
|
||||
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
mem::replace(&mut self.document_sender, document_sender);
|
||||
mem::replace(&mut self.document_receiver, document_receiver)
|
||||
swap(&mut self.document_sender, &mut document_sender);
|
||||
swap(&mut self.document_receiver, &mut document_receiver);
|
||||
document_receiver
|
||||
}
|
||||
|
||||
/// Rollback to the last commit
|
||||
@@ -565,12 +558,14 @@ impl IndexWriter {
|
||||
// and recreate a new one channels.
|
||||
self.recreate_document_channel();
|
||||
|
||||
let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
|
||||
let former_workers_join_handle =
|
||||
mem::replace(&mut self.workers_join_handle, Vec::new());
|
||||
|
||||
for worker_handle in former_workers_join_handle {
|
||||
let indexing_worker_result = worker_handle
|
||||
.join()
|
||||
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
||||
|
||||
indexing_worker_result?;
|
||||
// add a new worker for the next generation.
|
||||
self.add_indexing_worker()?;
|
||||
@@ -655,7 +650,6 @@ impl IndexWriter {
|
||||
mod tests {
|
||||
|
||||
use super::initial_table_size;
|
||||
use directory::error::LockError;
|
||||
use error::*;
|
||||
use indexer::NoMergePolicy;
|
||||
use schema::{self, Document};
|
||||
@@ -666,10 +660,10 @@ mod tests {
|
||||
fn test_lockfile_stops_duplicates() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let _index_writer = index.writer(3_000_000).unwrap();
|
||||
match index.writer(3_000_000) {
|
||||
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
|
||||
_ => panic!("Expected a `LockFailure` error"),
|
||||
let _index_writer = index.writer(40_000_000).unwrap();
|
||||
match index.writer(40_000_000) {
|
||||
Err(TantivyError::LockFailure(_)) => {}
|
||||
_ => panic!("Expected FileAlreadyExists error"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,7 +675,8 @@ mod tests {
|
||||
match index.writer_with_num_threads(1, 3_000_000) {
|
||||
Err(err) => {
|
||||
let err_msg = err.to_string();
|
||||
assert!(err_msg.contains("already an `IndexWriter`"));
|
||||
assert!(err_msg.contains("Lockfile"));
|
||||
assert!(err_msg.contains("Possible causes:"))
|
||||
}
|
||||
_ => panic!("Expected LockfileAlreadyExists error"),
|
||||
}
|
||||
@@ -691,7 +686,7 @@ mod tests {
|
||||
fn test_set_merge_policy() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let index_writer = index.writer(3_000_000).unwrap();
|
||||
let index_writer = index.writer(40_000_000).unwrap();
|
||||
assert_eq!(
|
||||
format!("{:?}", index_writer.get_merge_policy()),
|
||||
"LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, \
|
||||
@@ -710,11 +705,11 @@ mod tests {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let _index_writer = index.writer(3_000_000).unwrap();
|
||||
let _index_writer = index.writer(40_000_000).unwrap();
|
||||
// the lock should be released when the
|
||||
// index_writer leaves the scope.
|
||||
}
|
||||
let _index_writer_two = index.writer(3_000_000).unwrap();
|
||||
let _index_writer_two = index.writer(40_000_000).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -741,7 +736,7 @@ mod tests {
|
||||
index_writer.add_document(doc!(text_field=>"b"));
|
||||
index_writer.add_document(doc!(text_field=>"c"));
|
||||
}
|
||||
assert!(index_writer.commit().is_ok());
|
||||
assert_eq!(index_writer.commit().unwrap(), 3u64);
|
||||
index.load_searchers().unwrap();
|
||||
assert_eq!(num_docs_containing("a"), 0);
|
||||
assert_eq!(num_docs_containing("b"), 1);
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
use census::{Inventory, TrackedObject};
|
||||
use std::collections::HashSet;
|
||||
use SegmentId;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MergeOperationInventory(Inventory<InnerMergeOperation>);
|
||||
|
||||
impl MergeOperationInventory {
|
||||
pub fn segment_in_merge(&self) -> HashSet<SegmentId> {
|
||||
let mut segment_in_merge = HashSet::default();
|
||||
for merge_op in self.0.list() {
|
||||
for &segment_id in &merge_op.segment_ids {
|
||||
segment_in_merge.insert(segment_id);
|
||||
}
|
||||
}
|
||||
segment_in_merge
|
||||
}
|
||||
}
|
||||
|
||||
/// A `MergeOperation` has two role.
|
||||
/// It carries all of the information required to describe a merge :
|
||||
/// - `target_opstamp` is the opstamp up to which we want to consume the
|
||||
/// delete queue and reflect their deletes.
|
||||
/// - `segment_ids` is the list of segment to be merged.
|
||||
///
|
||||
/// The second role is to ensure keep track of the fact that these
|
||||
/// segments are in merge and avoid starting a merge operation that
|
||||
/// may conflict with this one.
|
||||
///
|
||||
/// This works by tracking merge operations. When considering computing
|
||||
/// merge candidates, we simply list tracked merge operations and remove
|
||||
/// their segments from possible merge candidates.
|
||||
pub struct MergeOperation {
|
||||
inner: TrackedObject<InnerMergeOperation>,
|
||||
}
|
||||
|
||||
struct InnerMergeOperation {
|
||||
target_opstamp: u64,
|
||||
segment_ids: Vec<SegmentId>,
|
||||
}
|
||||
|
||||
impl MergeOperation {
|
||||
pub fn new(
|
||||
inventory: &MergeOperationInventory,
|
||||
target_opstamp: u64,
|
||||
segment_ids: Vec<SegmentId>,
|
||||
) -> MergeOperation {
|
||||
let inner_merge_operation = InnerMergeOperation {
|
||||
target_opstamp,
|
||||
segment_ids,
|
||||
};
|
||||
MergeOperation {
|
||||
inner: inventory.0.track(inner_merge_operation),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn target_opstamp(&self) -> u64 {
|
||||
self.inner.target_opstamp
|
||||
}
|
||||
|
||||
pub fn segment_ids(&self) -> &[SegmentId] {
|
||||
&self.inner.segment_ids[..]
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,7 @@ pub struct MergeCandidate(pub Vec<SegmentId>);
|
||||
///
|
||||
/// Every time a the list of segments changes, the segment updater
|
||||
/// asks the merge policy if some segments should be merged.
|
||||
pub trait MergePolicy: marker::Send + marker::Sync + Debug {
|
||||
pub trait MergePolicy: MergePolicyClone + marker::Send + marker::Sync + Debug {
|
||||
/// Given the list of segment metas, returns the list of merge candidates.
|
||||
///
|
||||
/// This call happens on the segment updater thread, and will block
|
||||
@@ -19,6 +19,21 @@ pub trait MergePolicy: marker::Send + marker::Sync + Debug {
|
||||
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate>;
|
||||
}
|
||||
|
||||
/// MergePolicyClone
|
||||
pub trait MergePolicyClone {
|
||||
/// Returns a boxed clone of the MergePolicy.
|
||||
fn box_clone(&self) -> Box<MergePolicy>;
|
||||
}
|
||||
|
||||
impl<T> MergePolicyClone for T
|
||||
where
|
||||
T: 'static + MergePolicy + Clone,
|
||||
{
|
||||
fn box_clone(&self) -> Box<MergePolicy> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Never merge segments.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NoMergePolicy;
|
||||
|
||||
@@ -836,7 +836,7 @@ mod tests {
|
||||
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
|
||||
let bytes_score_field = schema_builder.add_bytes_field("score_bytes");
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
|
||||
let search_term = |searcher: &Searcher, term: Term| {
|
||||
let collector = FastFieldTestCollector::for_field(score_field);
|
||||
@@ -1130,11 +1130,15 @@ mod tests {
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index.load_searchers().unwrap();
|
||||
|
||||
let ref searcher = *index.searcher();
|
||||
assert!(segment_ids.is_empty());
|
||||
assert!(searcher.segment_readers().is_empty());
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
assert_eq!(searcher.num_docs(), 0);
|
||||
}
|
||||
}
|
||||
@@ -1145,7 +1149,7 @@ mod tests {
|
||||
let facet_field = schema_builder.add_facet_field("facet");
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
|
||||
let mut doc = Document::default();
|
||||
for facet in doc_facets {
|
||||
@@ -1210,7 +1214,7 @@ mod tests {
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
@@ -1233,7 +1237,7 @@ mod tests {
|
||||
|
||||
// Deleting one term
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
|
||||
let facet_term = Term::from_facet(facet_field, &facet);
|
||||
index_writer.delete_term(facet_term);
|
||||
@@ -1291,7 +1295,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let mut doc = Document::default();
|
||||
doc.add_u64(int_field, 1);
|
||||
index_writer.add_document(doc.clone());
|
||||
@@ -1299,26 +1303,24 @@ mod tests {
|
||||
index_writer.add_document(doc);
|
||||
index_writer.commit().expect("commit failed");
|
||||
index_writer.delete_term(Term::from_field_u64(int_field, 1));
|
||||
|
||||
index_writer.commit().expect("commit failed");
|
||||
}
|
||||
index.load_searchers().unwrap();
|
||||
let searcher = index.searcher();
|
||||
assert_eq!(searcher.num_docs(), 0);
|
||||
// Merging the segments
|
||||
{
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
|
||||
// assert delete has not been committed
|
||||
index.load_searchers().unwrap();
|
||||
let searcher = index.searcher();
|
||||
assert_eq!(searcher.num_docs(), 2);
|
||||
|
||||
index_writer.commit().unwrap();
|
||||
|
||||
index_writer.wait_merging_threads().unwrap();
|
||||
}
|
||||
|
||||
index.load_searchers().unwrap();
|
||||
let searcher = index.searcher();
|
||||
assert_eq!(searcher.num_docs(), 0);
|
||||
@@ -1334,7 +1336,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let index_doc = |index_writer: &mut IndexWriter, int_vals: &[u64]| {
|
||||
let mut doc = Document::default();
|
||||
for &val in int_vals {
|
||||
@@ -1423,7 +1425,7 @@ mod tests {
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
pub mod delete_queue;
|
||||
|
||||
mod directory_lock;
|
||||
mod doc_opstamp_mapping;
|
||||
pub mod index_writer;
|
||||
mod log_merge_policy;
|
||||
mod merge_operation;
|
||||
pub mod merge_policy;
|
||||
pub mod merger;
|
||||
pub mod operation;
|
||||
@@ -16,12 +15,14 @@ pub mod segment_updater;
|
||||
mod segment_writer;
|
||||
mod stamper;
|
||||
|
||||
pub(crate) use self::directory_lock::DirectoryLock;
|
||||
pub use self::directory_lock::LockType;
|
||||
|
||||
pub use self::index_writer::IndexWriter;
|
||||
pub use self::log_merge_policy::LogMergePolicy;
|
||||
pub use self::merge_operation::{MergeOperation, MergeOperationInventory};
|
||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::SegmentEntry;
|
||||
pub use self::segment_entry::{SegmentEntry, SegmentState};
|
||||
pub use self::segment_manager::SegmentManager;
|
||||
pub use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
|
||||
@@ -4,6 +4,21 @@ use core::SegmentMeta;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum SegmentState {
|
||||
Ready,
|
||||
InMerge,
|
||||
}
|
||||
|
||||
impl SegmentState {
|
||||
pub fn letter_code(self) -> char {
|
||||
match self {
|
||||
SegmentState::InMerge => 'M',
|
||||
SegmentState::Ready => 'R',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A segment entry describes the state of
|
||||
/// a given segment, at a given instant.
|
||||
///
|
||||
@@ -20,6 +35,7 @@ use std::fmt;
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentEntry {
|
||||
meta: SegmentMeta,
|
||||
state: SegmentState,
|
||||
delete_bitset: Option<BitSet>,
|
||||
delete_cursor: DeleteCursor,
|
||||
}
|
||||
@@ -33,6 +49,7 @@ impl SegmentEntry {
|
||||
) -> SegmentEntry {
|
||||
SegmentEntry {
|
||||
meta: segment_meta,
|
||||
state: SegmentState::Ready,
|
||||
delete_bitset,
|
||||
delete_cursor,
|
||||
}
|
||||
@@ -55,6 +72,14 @@ impl SegmentEntry {
|
||||
&mut self.delete_cursor
|
||||
}
|
||||
|
||||
/// Return the `SegmentEntry`.
|
||||
///
|
||||
/// The state describes whether the segment is available for
|
||||
/// a merge or not.
|
||||
pub fn state(&self) -> SegmentState {
|
||||
self.state
|
||||
}
|
||||
|
||||
/// Returns the segment id.
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.meta.id()
|
||||
@@ -64,10 +89,33 @@ impl SegmentEntry {
|
||||
pub fn meta(&self) -> &SegmentMeta {
|
||||
&self.meta
|
||||
}
|
||||
|
||||
/// Mark the `SegmentEntry` as in merge.
|
||||
///
|
||||
/// Only segments that are not already
|
||||
/// in a merge are elligible for future merge.
|
||||
pub fn start_merge(&mut self) {
|
||||
self.state = SegmentState::InMerge;
|
||||
}
|
||||
|
||||
/// Cancel a merge
|
||||
///
|
||||
/// If a merge fails, it is important to switch
|
||||
/// the segment back to a idle state, so that it
|
||||
/// may be elligible for future merges.
|
||||
pub fn cancel_merge(&mut self) {
|
||||
self.state = SegmentState::Ready;
|
||||
}
|
||||
|
||||
/// Returns true iff a segment should
|
||||
/// be considered for a merge.
|
||||
pub fn is_ready(&self) -> bool {
|
||||
self.state == SegmentState::Ready
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SegmentEntry {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(formatter, "SegmentEntry({:?})", self.meta)
|
||||
write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use Result as TantivyResult;
|
||||
struct SegmentRegisters {
|
||||
uncommitted: SegmentRegister,
|
||||
committed: SegmentRegister,
|
||||
writing: HashSet<SegmentId>,
|
||||
}
|
||||
|
||||
/// The segment manager stores the list of segments
|
||||
@@ -40,17 +41,12 @@ impl Debug for SegmentManager {
|
||||
}
|
||||
|
||||
pub fn get_mergeable_segments(
|
||||
in_merge_segment_ids: &HashSet<SegmentId>,
|
||||
segment_manager: &SegmentManager,
|
||||
) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
|
||||
let registers_lock = segment_manager.read();
|
||||
(
|
||||
registers_lock
|
||||
.committed
|
||||
.get_mergeable_segments(in_merge_segment_ids),
|
||||
registers_lock
|
||||
.uncommitted
|
||||
.get_mergeable_segments(in_merge_segment_ids),
|
||||
registers_lock.committed.get_mergeable_segments(),
|
||||
registers_lock.uncommitted.get_mergeable_segments(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -63,6 +59,7 @@ impl SegmentManager {
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
writing: HashSet::new(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -75,6 +72,12 @@ impl SegmentManager {
|
||||
segment_entries
|
||||
}
|
||||
|
||||
/// Returns the overall number of segments in the `SegmentManager`
|
||||
pub fn num_segments(&self) -> usize {
|
||||
let registers_lock = self.read();
|
||||
registers_lock.committed.len() + registers_lock.uncommitted.len()
|
||||
}
|
||||
|
||||
/// List the files that are useful to the index.
|
||||
///
|
||||
/// This does not include lock files, or files that are obsolete
|
||||
@@ -103,21 +106,6 @@ impl SegmentManager {
|
||||
.expect("Failed to acquire write lock on SegmentManager.")
|
||||
}
|
||||
|
||||
/// Deletes all empty segments
|
||||
fn remove_empty_segments(&self) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock
|
||||
.committed
|
||||
.segment_entries()
|
||||
.iter()
|
||||
.filter(|segment| segment.meta().num_docs() == 0)
|
||||
.for_each(|segment| {
|
||||
registers_lock
|
||||
.committed
|
||||
.remove_segment(&segment.segment_id())
|
||||
});
|
||||
}
|
||||
|
||||
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.committed.clear();
|
||||
@@ -133,22 +121,25 @@ impl SegmentManager {
|
||||
/// the `segment_ids` are not either all committed or all
|
||||
/// uncommitted.
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
|
||||
let registers_lock = self.read();
|
||||
let mut registers_lock = self.write();
|
||||
let mut segment_entries = vec![];
|
||||
if registers_lock.uncommitted.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
let segment_entry = registers_lock.uncommitted
|
||||
.get(segment_id)
|
||||
.start_merge(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
} else if registers_lock.committed.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
let segment_entry = registers_lock.committed
|
||||
.get(segment_id)
|
||||
.start_merge(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
for segment_id in segment_ids {
|
||||
registers_lock.committed.start_merge(segment_id);
|
||||
}
|
||||
} else {
|
||||
let error_msg = "Merge operation sent for segments that are not \
|
||||
all uncommited or commited."
|
||||
@@ -158,8 +149,50 @@ impl SegmentManager {
|
||||
Ok(segment_entries)
|
||||
}
|
||||
|
||||
pub fn cancel_merge(
|
||||
&self,
|
||||
before_merge_segment_ids: &[SegmentId],
|
||||
after_merge_segment_id: SegmentId,
|
||||
) {
|
||||
let mut registers_lock = self.write();
|
||||
|
||||
// we mark all segments are ready for merge.
|
||||
{
|
||||
let target_segment_register: &mut SegmentRegister;
|
||||
target_segment_register = {
|
||||
if registers_lock
|
||||
.uncommitted
|
||||
.contains_all(before_merge_segment_ids)
|
||||
{
|
||||
&mut registers_lock.uncommitted
|
||||
} else if registers_lock
|
||||
.committed
|
||||
.contains_all(before_merge_segment_ids)
|
||||
{
|
||||
&mut registers_lock.committed
|
||||
} else {
|
||||
warn!("couldn't find segment in SegmentManager");
|
||||
return;
|
||||
}
|
||||
};
|
||||
for segment_id in before_merge_segment_ids {
|
||||
target_segment_register.cancel_merge(segment_id);
|
||||
}
|
||||
}
|
||||
|
||||
// ... and we make sure the target segment entry
|
||||
// can be garbage collected.
|
||||
registers_lock.writing.remove(&after_merge_segment_id);
|
||||
}
|
||||
|
||||
pub fn write_segment(&self, segment_id: SegmentId) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.writing.insert(segment_id);
|
||||
}
|
||||
|
||||
pub fn add_segment(&self, segment_entry: SegmentEntry) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.writing.remove(&segment_entry.segment_id());
|
||||
registers_lock.uncommitted.add_segment_entry(segment_entry);
|
||||
}
|
||||
|
||||
@@ -169,6 +202,10 @@ impl SegmentManager {
|
||||
after_merge_segment_entry: SegmentEntry,
|
||||
) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock
|
||||
.writing
|
||||
.remove(&after_merge_segment_entry.segment_id());
|
||||
|
||||
let target_register: &mut SegmentRegister = {
|
||||
if registers_lock
|
||||
.uncommitted
|
||||
@@ -192,7 +229,6 @@ impl SegmentManager {
|
||||
}
|
||||
|
||||
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
|
||||
self.remove_empty_segments();
|
||||
let registers_lock = self.read();
|
||||
registers_lock.committed.segment_metas()
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ use core::SegmentMeta;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
use indexer::segment_entry::SegmentEntry;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
|
||||
/// The segment register keeps track
|
||||
@@ -22,8 +21,8 @@ pub struct SegmentRegister {
|
||||
impl Debug for SegmentRegister {
|
||||
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
|
||||
write!(f, "SegmentRegister(")?;
|
||||
for k in self.segment_states.keys() {
|
||||
write!(f, "{}, ", k.short_uuid_string())?;
|
||||
for (k, v) in &self.segment_states {
|
||||
write!(f, "{}:{}, ", k.short_uuid_string(), v.state().letter_code())?;
|
||||
}
|
||||
write!(f, ")")?;
|
||||
Ok(())
|
||||
@@ -35,13 +34,14 @@ impl SegmentRegister {
|
||||
self.segment_states.clear();
|
||||
}
|
||||
|
||||
pub fn get_mergeable_segments(
|
||||
&self,
|
||||
in_merge_segment_ids: &HashSet<SegmentId>,
|
||||
) -> Vec<SegmentMeta> {
|
||||
pub fn len(&self) -> usize {
|
||||
self.segment_states.len()
|
||||
}
|
||||
|
||||
pub fn get_mergeable_segments(&self) -> Vec<SegmentMeta> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
|
||||
.filter(|segment_entry| segment_entry.is_ready())
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect()
|
||||
}
|
||||
@@ -60,7 +60,7 @@ impl SegmentRegister {
|
||||
segment_ids
|
||||
}
|
||||
|
||||
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
|
||||
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
|
||||
segment_ids
|
||||
.iter()
|
||||
.all(|segment_id| self.segment_states.contains_key(segment_id))
|
||||
@@ -75,8 +75,20 @@ impl SegmentRegister {
|
||||
self.segment_states.remove(segment_id);
|
||||
}
|
||||
|
||||
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
pub fn cancel_merge(&mut self, segment_id: &SegmentId) {
|
||||
self.segment_states
|
||||
.get_mut(segment_id)
|
||||
.expect("Received a merge notification for a segment that is not registered")
|
||||
.cancel_merge();
|
||||
}
|
||||
|
||||
pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
if let Some(segment_entry) = self.segment_states.get_mut(segment_id) {
|
||||
segment_entry.start_merge();
|
||||
Some(segment_entry.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
@@ -88,6 +100,11 @@ impl SegmentRegister {
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -96,6 +113,7 @@ mod tests {
|
||||
use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use indexer::delete_queue::*;
|
||||
use indexer::SegmentState;
|
||||
|
||||
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
|
||||
segment_register
|
||||
@@ -119,12 +137,42 @@ mod tests {
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(
|
||||
segment_register
|
||||
.segment_entry(&segment_id_a)
|
||||
.unwrap()
|
||||
.state(),
|
||||
SegmentState::Ready
|
||||
);
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(
|
||||
segment_register
|
||||
.segment_entry(&segment_id_b)
|
||||
.unwrap()
|
||||
.state(),
|
||||
SegmentState::Ready
|
||||
);
|
||||
segment_register.start_merge(&segment_id_a);
|
||||
segment_register.start_merge(&segment_id_b);
|
||||
assert_eq!(
|
||||
segment_register
|
||||
.segment_entry(&segment_id_a)
|
||||
.unwrap()
|
||||
.state(),
|
||||
SegmentState::InMerge
|
||||
);
|
||||
assert_eq!(
|
||||
segment_register
|
||||
.segment_entry(&segment_id_b)
|
||||
.unwrap()
|
||||
.state(),
|
||||
SegmentState::InMerge
|
||||
);
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
|
||||
@@ -16,10 +16,8 @@ use futures_cpupool::CpuFuture;
|
||||
use futures_cpupool::CpuPool;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
use indexer::index_writer::advance_deletes;
|
||||
use indexer::merge_operation::MergeOperationInventory;
|
||||
use indexer::merger::IndexMerger;
|
||||
use indexer::stamper::Stamper;
|
||||
use indexer::MergeOperation;
|
||||
use indexer::SegmentEntry;
|
||||
use indexer::SegmentSerializer;
|
||||
use indexer::{DefaultMergePolicy, MergePolicy};
|
||||
@@ -27,7 +25,6 @@ use schema::Schema;
|
||||
use serde_json;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::mem;
|
||||
use std::ops::DerefMut;
|
||||
@@ -53,10 +50,9 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
|
||||
segments: Vec::new(),
|
||||
schema,
|
||||
opstamp: 0u64,
|
||||
payload: None,
|
||||
payload: None
|
||||
},
|
||||
directory,
|
||||
)
|
||||
directory)
|
||||
}
|
||||
|
||||
/// Save the index meta file.
|
||||
@@ -68,9 +64,17 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
|
||||
/// and flushed.
|
||||
///
|
||||
/// This method is not part of tantivy's public API
|
||||
fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> {
|
||||
fn save_metas(
|
||||
metas: &IndexMeta,
|
||||
directory: &mut Directory,
|
||||
) -> Result<()> {
|
||||
// let metas = IndexMeta {
|
||||
// segments: segment_metas,
|
||||
// schema,
|
||||
// opstamp,
|
||||
// payload,
|
||||
// };
|
||||
let mut buffer = serde_json::to_vec_pretty(metas)?;
|
||||
// Just adding a new line at the end of the buffer.
|
||||
writeln!(&mut buffer)?;
|
||||
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
||||
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
||||
@@ -82,21 +86,21 @@ fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> {
|
||||
//
|
||||
// All this processing happens on a single thread
|
||||
// consuming a common queue.
|
||||
//
|
||||
// We voluntarily pass a merge_operation ref to guarantee that
|
||||
// the merge_operation is alive during the process
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
|
||||
|
||||
struct MergeOperation {
|
||||
pub target_opstamp: u64,
|
||||
pub segment_ids: Vec<SegmentId>,
|
||||
}
|
||||
|
||||
fn perform_merge(
|
||||
merge_operation: &MergeOperation,
|
||||
index: &Index,
|
||||
mut segment_entries: Vec<SegmentEntry>,
|
||||
mut merged_segment: Segment,
|
||||
target_opstamp: u64,
|
||||
) -> Result<SegmentEntry> {
|
||||
let target_opstamp = merge_operation.target_opstamp();
|
||||
|
||||
// first we need to apply deletes to our segment.
|
||||
let mut merged_segment = index.new_segment();
|
||||
|
||||
// TODO add logging
|
||||
let schema = index.schema();
|
||||
@@ -140,13 +144,12 @@ struct InnerSegmentUpdater {
|
||||
pool: CpuPool,
|
||||
index: Index,
|
||||
segment_manager: SegmentManager,
|
||||
merge_policy: RwLock<Arc<Box<MergePolicy>>>,
|
||||
merge_policy: RwLock<Box<MergePolicy>>,
|
||||
merging_thread_id: AtomicUsize,
|
||||
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<()>>>>,
|
||||
generation: AtomicUsize,
|
||||
killed: AtomicBool,
|
||||
stamper: Stamper,
|
||||
merge_operations: MergeOperationInventory,
|
||||
}
|
||||
|
||||
impl SegmentUpdater {
|
||||
@@ -167,23 +170,28 @@ impl SegmentUpdater {
|
||||
pool,
|
||||
index,
|
||||
segment_manager,
|
||||
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
|
||||
merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())),
|
||||
merging_thread_id: AtomicUsize::default(),
|
||||
merging_threads: RwLock::new(HashMap::new()),
|
||||
generation: AtomicUsize::default(),
|
||||
killed: AtomicBool::new(false),
|
||||
stamper,
|
||||
merge_operations: Default::default(),
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<MergePolicy>> {
|
||||
self.0.merge_policy.read().unwrap().clone()
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
let new_segment = self.0.index.new_segment();
|
||||
let segment_id = new_segment.id();
|
||||
self.0.segment_manager.write_segment(segment_id);
|
||||
new_segment
|
||||
}
|
||||
|
||||
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
|
||||
self.0.merge_policy.read().unwrap().box_clone()
|
||||
}
|
||||
|
||||
pub fn set_merge_policy(&self, merge_policy: Box<MergePolicy>) {
|
||||
let arc_merge_policy = Arc::new(merge_policy);
|
||||
*self.0.merge_policy.write().unwrap() = arc_merge_policy;
|
||||
*self.0.merge_policy.write().unwrap() = merge_policy;
|
||||
}
|
||||
|
||||
fn get_merging_thread_id(&self) -> usize {
|
||||
@@ -257,10 +265,13 @@ impl SegmentUpdater {
|
||||
segments: commited_segment_metas,
|
||||
schema: index.schema(),
|
||||
opstamp,
|
||||
payload: commit_message,
|
||||
payload: commit_message
|
||||
};
|
||||
save_metas(&index_meta, directory.box_clone().borrow_mut())
|
||||
.expect("Could not save metas.");
|
||||
save_metas(
|
||||
&index_meta,
|
||||
directory.box_clone().borrow_mut(),
|
||||
)
|
||||
.expect("Could not save metas.");
|
||||
self.store_meta(&index_meta);
|
||||
}
|
||||
}
|
||||
@@ -296,14 +307,12 @@ impl SegmentUpdater {
|
||||
}
|
||||
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||
let segment_ids_vec = segment_ids.to_vec();
|
||||
let commit_opstamp = self.load_metas().opstamp;
|
||||
let merge_operation = MergeOperation::new(
|
||||
&self.0.merge_operations,
|
||||
commit_opstamp,
|
||||
segment_ids.to_vec(),
|
||||
);
|
||||
self.run_async(move |segment_updater| segment_updater.start_merge_impl(merge_operation))
|
||||
.wait()?
|
||||
self.run_async(move |segment_updater| {
|
||||
segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp)
|
||||
})
|
||||
.wait()?
|
||||
}
|
||||
|
||||
fn store_meta(&self, index_meta: &IndexMeta) {
|
||||
@@ -314,25 +323,22 @@ impl SegmentUpdater {
|
||||
}
|
||||
|
||||
// `segment_ids` is required to be non-empty.
|
||||
fn start_merge_impl(&self, merge_operation: MergeOperation) -> Result<Receiver<SegmentMeta>> {
|
||||
assert!(
|
||||
!merge_operation.segment_ids().is_empty(),
|
||||
"Segment_ids cannot be empty."
|
||||
);
|
||||
fn start_merge_impl(
|
||||
&self,
|
||||
segment_ids: &[SegmentId],
|
||||
target_opstamp: u64,
|
||||
) -> Result<Receiver<SegmentMeta>> {
|
||||
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
|
||||
|
||||
let segment_updater_clone = self.clone();
|
||||
let segment_entries: Vec<SegmentEntry> = self
|
||||
.0
|
||||
.segment_manager
|
||||
.start_merge(merge_operation.segment_ids())?;
|
||||
let segment_entries: Vec<SegmentEntry> = self.0.segment_manager.start_merge(segment_ids)?;
|
||||
|
||||
// let segment_ids_vec = merge_operation.segment_ids.to_vec();
|
||||
let segment_ids_vec = segment_ids.to_vec();
|
||||
|
||||
let merging_thread_id = self.get_merging_thread_id();
|
||||
info!(
|
||||
"Starting merge thread #{} - {:?}",
|
||||
merging_thread_id,
|
||||
merge_operation.segment_ids()
|
||||
merging_thread_id, segment_ids
|
||||
);
|
||||
let (merging_future_send, merging_future_recv) = oneshot();
|
||||
|
||||
@@ -341,17 +347,20 @@ impl SegmentUpdater {
|
||||
.name(format!("mergingthread-{}", merging_thread_id))
|
||||
.spawn(move || {
|
||||
// first we need to apply deletes to our segment.
|
||||
let merged_segment = segment_updater_clone.new_segment();
|
||||
let merged_segment_id = merged_segment.id();
|
||||
let merge_result = perform_merge(
|
||||
&merge_operation,
|
||||
&segment_updater_clone.0.index,
|
||||
segment_entries,
|
||||
merged_segment,
|
||||
target_opstamp,
|
||||
);
|
||||
|
||||
match merge_result {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
let merged_segment_meta = after_merge_segment_entry.meta().clone();
|
||||
segment_updater_clone
|
||||
.end_merge(merge_operation, after_merge_segment_entry)
|
||||
.end_merge(segment_ids_vec, after_merge_segment_entry)
|
||||
.expect("Segment updater thread is corrupted.");
|
||||
|
||||
// the future may fail if the listener of the oneshot future
|
||||
@@ -362,18 +371,13 @@ impl SegmentUpdater {
|
||||
let _merging_future_res = merging_future_send.send(merged_segment_meta);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Merge of {:?} was cancelled: {:?}",
|
||||
merge_operation.segment_ids(),
|
||||
e
|
||||
);
|
||||
warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
}
|
||||
// As `merge_operation` will be dropped, the segment in merge state will
|
||||
// be available for merge again.
|
||||
// `merging_future_send` will be dropped, sending an error to the future.
|
||||
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
|
||||
// merging_future_send will be dropped, sending an error to the future.
|
||||
}
|
||||
}
|
||||
segment_updater_clone
|
||||
@@ -394,34 +398,37 @@ impl SegmentUpdater {
|
||||
}
|
||||
|
||||
fn consider_merge_options(&self) {
|
||||
let merge_segment_ids: HashSet<SegmentId> = self.0.merge_operations.segment_in_merge();
|
||||
let (committed_segments, uncommitted_segments) =
|
||||
get_mergeable_segments(&merge_segment_ids, &self.0.segment_manager);
|
||||
|
||||
get_mergeable_segments(&self.0.segment_manager);
|
||||
// Committed segments cannot be merged with uncommitted_segments.
|
||||
// We therefore consider merges using these two sets of segments independently.
|
||||
let merge_policy = self.get_merge_policy();
|
||||
|
||||
let current_opstamp = self.0.stamper.stamp();
|
||||
let mut merge_candidates: Vec<MergeOperation> = merge_policy
|
||||
let mut merge_candidates = merge_policy
|
||||
.compute_merge_candidates(&uncommitted_segments)
|
||||
.into_iter()
|
||||
.map(|merge_candidate| {
|
||||
MergeOperation::new(&self.0.merge_operations, current_opstamp, merge_candidate.0)
|
||||
.map(|merge_candidate| MergeOperation {
|
||||
target_opstamp: current_opstamp,
|
||||
segment_ids: merge_candidate.0,
|
||||
})
|
||||
.collect();
|
||||
|
||||
.collect::<Vec<_>>();
|
||||
let commit_opstamp = self.load_metas().opstamp;
|
||||
let committed_merge_candidates = merge_policy
|
||||
.compute_merge_candidates(&committed_segments)
|
||||
.into_iter()
|
||||
.map(|merge_candidate| {
|
||||
MergeOperation::new(&self.0.merge_operations, commit_opstamp, merge_candidate.0)
|
||||
.map(|merge_candidate| MergeOperation {
|
||||
target_opstamp: commit_opstamp,
|
||||
segment_ids: merge_candidate.0,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
merge_candidates.extend(committed_merge_candidates.into_iter());
|
||||
for merge_operation in merge_candidates {
|
||||
match self.start_merge_impl(merge_operation) {
|
||||
for MergeOperation {
|
||||
target_opstamp,
|
||||
segment_ids,
|
||||
} in merge_candidates
|
||||
{
|
||||
match self.start_merge_impl(&segment_ids, target_opstamp) {
|
||||
Ok(merge_future) => {
|
||||
if let Err(e) = merge_future.fuse().poll() {
|
||||
error!("The merge task failed quickly after starting: {:?}", e);
|
||||
@@ -437,9 +444,19 @@ impl SegmentUpdater {
|
||||
}
|
||||
}
|
||||
|
||||
fn cancel_merge(
|
||||
&self,
|
||||
before_merge_segment_ids: &[SegmentId],
|
||||
after_merge_segment_entry: SegmentId,
|
||||
) {
|
||||
self.0
|
||||
.segment_manager
|
||||
.cancel_merge(before_merge_segment_ids, after_merge_segment_entry);
|
||||
}
|
||||
|
||||
fn end_merge(
|
||||
&self,
|
||||
merge_operation: MergeOperation,
|
||||
before_merge_segment_ids: Vec<SegmentId>,
|
||||
mut after_merge_segment_entry: SegmentEntry,
|
||||
) -> Result<()> {
|
||||
self.run_async(move |segment_updater| {
|
||||
@@ -455,15 +472,16 @@ impl SegmentUpdater {
|
||||
{
|
||||
error!(
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
merge_operation.segment_ids(),
|
||||
e
|
||||
before_merge_segment_ids, e
|
||||
);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
}
|
||||
// ... cancel merge
|
||||
// `merge_operations` are tracked. As it is dropped, the
|
||||
// the segment_ids will be available again for merge.
|
||||
segment_updater.cancel_merge(
|
||||
&before_merge_segment_ids,
|
||||
after_merge_segment_entry.segment_id(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -471,7 +489,7 @@ impl SegmentUpdater {
|
||||
segment_updater
|
||||
.0
|
||||
.segment_manager
|
||||
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry);
|
||||
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
||||
segment_updater.consider_merge_options();
|
||||
info!("save metas");
|
||||
let previous_metas = segment_updater.load_metas();
|
||||
@@ -497,25 +515,32 @@ impl SegmentUpdater {
|
||||
/// Obsolete files will eventually be cleaned up
|
||||
/// by the directory garbage collector.
|
||||
pub fn wait_merging_thread(&self) -> Result<()> {
|
||||
let mut num_segments: usize;
|
||||
loop {
|
||||
let merging_threads: HashMap<usize, JoinHandle<Result<()>>> = {
|
||||
num_segments = self.0.segment_manager.num_segments();
|
||||
|
||||
let mut new_merging_threads = HashMap::new();
|
||||
{
|
||||
let mut merging_threads = self.0.merging_threads.write().unwrap();
|
||||
mem::replace(merging_threads.deref_mut(), HashMap::new())
|
||||
};
|
||||
if merging_threads.is_empty() {
|
||||
return Ok(());
|
||||
mem::swap(&mut new_merging_threads, merging_threads.deref_mut());
|
||||
}
|
||||
debug!("wait merging thread {}", merging_threads.len());
|
||||
for (_, merging_thread_handle) in merging_threads {
|
||||
debug!("wait merging thread {}", new_merging_threads.len());
|
||||
for (_, merging_thread_handle) in new_merging_threads {
|
||||
merging_thread_handle
|
||||
.join()
|
||||
.map(|_| ())
|
||||
.map_err(|_| TantivyError::ErrorInThread("Merging thread failed.".into()))?;
|
||||
}
|
||||
// Our merging thread may have queued their completed merged segment.
|
||||
// Let's wait for that too.
|
||||
// Our merging thread may have queued their completed
|
||||
self.run_async(move |_| {}).wait()?;
|
||||
|
||||
let new_num_segments = self.0.segment_manager.num_segments();
|
||||
|
||||
if new_num_segments >= num_segments {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -535,7 +560,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.set_merge_policy(Box::new(MergeWheneverPossible));
|
||||
|
||||
{
|
||||
@@ -579,75 +604,4 @@ mod tests {
|
||||
assert_eq!(index.searcher().segment_readers().len(), 1);
|
||||
assert_eq!(index.searcher().num_docs(), 302);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_all_docs() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
|
||||
{
|
||||
for _ in 0..100 {
|
||||
index_writer.add_document(doc!(text_field=>"a"));
|
||||
index_writer.add_document(doc!(text_field=>"b"));
|
||||
}
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
{
|
||||
for _ in 0..100 {
|
||||
index_writer.add_document(doc!(text_field=>"c"));
|
||||
index_writer.add_document(doc!(text_field=>"d"));
|
||||
}
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
{
|
||||
index_writer.add_document(doc!(text_field=>"e"));
|
||||
index_writer.add_document(doc!(text_field=>"f"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
{
|
||||
let seg_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
// docs exist, should have at least 1 segment
|
||||
assert!(seg_ids.len() > 0);
|
||||
}
|
||||
|
||||
{
|
||||
let term_vals = vec!["a", "b", "c", "d", "e", "f"];
|
||||
for term_val in term_vals {
|
||||
let term = Term::from_field_text(text_field, term_val);
|
||||
index_writer.delete_term(term);
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
index_writer
|
||||
.wait_merging_threads()
|
||||
.expect("waiting for merging threads");
|
||||
}
|
||||
|
||||
index.load_searchers().unwrap();
|
||||
assert_eq!(index.searcher().num_docs(), 0);
|
||||
|
||||
let seg_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
assert!(seg_ids.is_empty());
|
||||
|
||||
index.load_searchers().unwrap();
|
||||
assert_eq!(index.searcher().num_docs(), 0);
|
||||
// empty segments should be erased
|
||||
assert!(index.searchable_segment_metas().unwrap().is_empty());
|
||||
assert!(index.searcher().segment_readers().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
|
||||
// AtomicU64 have not landed in stable.
|
||||
// For the moment let's just use AtomicUsize on
|
||||
@@ -62,6 +63,7 @@ impl Stamper {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
|
||||
40
src/lib.rs
40
src/lib.rs
@@ -129,7 +129,10 @@ extern crate base64;
|
||||
extern crate bit_set;
|
||||
extern crate bitpacking;
|
||||
extern crate byteorder;
|
||||
extern crate scoped_pool;
|
||||
|
||||
extern crate combine;
|
||||
|
||||
extern crate crossbeam;
|
||||
extern crate fnv;
|
||||
extern crate fst;
|
||||
@@ -143,7 +146,6 @@ extern crate num_cpus;
|
||||
extern crate owning_ref;
|
||||
extern crate regex;
|
||||
extern crate rust_stemmers;
|
||||
extern crate scoped_pool;
|
||||
extern crate serde;
|
||||
extern crate stable_deref_trait;
|
||||
extern crate tempdir;
|
||||
@@ -168,7 +170,7 @@ extern crate maplit;
|
||||
extern crate test;
|
||||
|
||||
#[macro_use]
|
||||
extern crate downcast_rs;
|
||||
extern crate downcast;
|
||||
|
||||
#[macro_use]
|
||||
extern crate fail;
|
||||
@@ -229,7 +231,11 @@ pub use common::{i64_to_u64, u64_to_i64};
|
||||
/// Expose the current version of tantivy, as well
|
||||
/// whether it was compiled with the simd compression.
|
||||
pub fn version() -> &'static str {
|
||||
env!("CARGO_PKG_VERSION")
|
||||
if cfg!(feature = "simdcompression") {
|
||||
concat!(env!("CARGO_PKG_VERSION"), "-simd")
|
||||
} else {
|
||||
concat!(env!("CARGO_PKG_VERSION"), "-nosimd")
|
||||
}
|
||||
}
|
||||
|
||||
/// Defines tantivy's merging strategy
|
||||
@@ -342,7 +348,7 @@ mod tests {
|
||||
let index = Index::create_from_tempdir(schema).unwrap();
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field=>"af b");
|
||||
index_writer.add_document(doc);
|
||||
@@ -364,7 +370,7 @@ mod tests {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
index_writer.add_document(doc!(text_field=>"a b c"));
|
||||
index_writer.commit().unwrap();
|
||||
@@ -406,7 +412,7 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field=>"a b c");
|
||||
index_writer.add_document(doc);
|
||||
@@ -434,7 +440,7 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field=>"a b c");
|
||||
index_writer.add_document(doc);
|
||||
@@ -481,7 +487,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
// 0
|
||||
index_writer.add_document(doc!(text_field=>"a b"));
|
||||
// 1
|
||||
@@ -528,7 +534,7 @@ mod tests {
|
||||
}
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
// 0
|
||||
index_writer.add_document(doc!(text_field=>"a b"));
|
||||
// 1
|
||||
@@ -565,7 +571,7 @@ mod tests {
|
||||
}
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a b"));
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "c"));
|
||||
index_writer.rollback().unwrap();
|
||||
@@ -614,7 +620,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(field=>1u64));
|
||||
index_writer.commit().unwrap();
|
||||
index.load_searchers().unwrap();
|
||||
@@ -637,7 +643,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let negative_val = -1i64;
|
||||
index_writer.add_document(doc!(value_field => negative_val));
|
||||
index_writer.commit().unwrap();
|
||||
@@ -661,7 +667,7 @@ mod tests {
|
||||
let absent_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(2, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
assert!(index.load_searchers().is_ok());
|
||||
@@ -678,7 +684,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(2, 40_000_000).unwrap();
|
||||
|
||||
let add_document = |index_writer: &mut IndexWriter, val: &'static str| {
|
||||
let doc = doc!(text_field=>val);
|
||||
@@ -714,7 +720,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field=>"af af af bc bc");
|
||||
index_writer.add_document(doc);
|
||||
@@ -750,7 +756,7 @@ mod tests {
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"af af af b"));
|
||||
index_writer.add_document(doc!(text_field=>"a b c"));
|
||||
index_writer.add_document(doc!(text_field=>"a b c d"));
|
||||
@@ -803,7 +809,7 @@ mod tests {
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field=>"af b");
|
||||
index_writer.add_document(doc);
|
||||
|
||||
@@ -280,7 +280,7 @@ pub mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let mut doc = Document::default();
|
||||
doc.add_text(text_field, "g b b d c g c");
|
||||
@@ -322,7 +322,7 @@ pub mod tests {
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
for i in 0..num_docs {
|
||||
let mut doc = Document::default();
|
||||
doc.add_u64(value_field, 2);
|
||||
@@ -399,7 +399,7 @@ pub mod tests {
|
||||
|
||||
// delete some of the documents
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.delete_term(term_0);
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
@@ -449,7 +449,7 @@ pub mod tests {
|
||||
|
||||
// delete everything else
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.delete_term(term_1);
|
||||
|
||||
assert!(index_writer.commit().is_ok());
|
||||
@@ -457,14 +457,25 @@ pub mod tests {
|
||||
index.load_searchers().unwrap();
|
||||
|
||||
let searcher = index.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
|
||||
// finally, check that it's empty
|
||||
{
|
||||
let searchable_segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("could not get index segment ids");
|
||||
assert!(searchable_segment_ids.is_empty());
|
||||
assert_eq!(searcher.num_docs(), 0);
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(term_2.field())
|
||||
.read_postings(&term_2, IndexRecordOption::Basic)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(segment_postings.skip_next(0), SkipResult::Reached);
|
||||
assert_eq!(segment_postings.doc(), 0);
|
||||
assert!(segment_reader.is_deleted(0));
|
||||
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(term_2.field())
|
||||
.read_postings(&term_2, IndexRecordOption::Basic)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(segment_postings.skip_next(num_docs), SkipResult::End);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,7 +506,7 @@ pub mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
let posting_list_size = 1_000_000;
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
for _ in 0..posting_list_size {
|
||||
let mut doc = Document::default();
|
||||
if rng.gen_bool(1f64 / 15f64) {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use super::stacker::{Addr, MemoryArena, TermHashMap};
|
||||
|
||||
use postings::recorder::{
|
||||
BufferLender, NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder,
|
||||
};
|
||||
use postings::recorder::{NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder};
|
||||
use postings::UnorderedTermId;
|
||||
use postings::{FieldSerializer, InvertedIndexSerializer};
|
||||
use schema::IndexRecordOption;
|
||||
@@ -215,7 +213,7 @@ pub trait PostingsWriter {
|
||||
|
||||
/// The `SpecializedPostingsWriter` is just here to remove dynamic
|
||||
/// dispatch to the recorder information.
|
||||
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
|
||||
pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
|
||||
total_num_tokens: u64,
|
||||
_recorder_type: PhantomData<Rec>,
|
||||
}
|
||||
@@ -247,7 +245,8 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
|
||||
debug_assert!(term.as_slice().len() >= 4);
|
||||
self.total_num_tokens += 1;
|
||||
term_index.mutate_or_create(term, |opt_recorder: Option<Rec>| {
|
||||
if let Some(mut recorder) = opt_recorder {
|
||||
if opt_recorder.is_some() {
|
||||
let mut recorder = opt_recorder.unwrap();
|
||||
let current_doc = recorder.current_doc();
|
||||
if current_doc != doc {
|
||||
recorder.close_doc(heap);
|
||||
@@ -256,7 +255,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
|
||||
recorder.record_position(position, heap);
|
||||
recorder
|
||||
} else {
|
||||
let mut recorder = Rec::new();
|
||||
let mut recorder = Rec::new(heap);
|
||||
recorder.new_doc(doc, heap);
|
||||
recorder.record_position(position, heap);
|
||||
recorder
|
||||
@@ -271,11 +270,10 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
|
||||
termdict_heap: &MemoryArena,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
for &(term_bytes, addr, _) in term_addrs {
|
||||
let recorder: Rec = termdict_heap.read(addr);
|
||||
let recorder: Rec = unsafe { termdict_heap.read(addr) };
|
||||
serializer.new_term(&term_bytes[4..])?;
|
||||
recorder.serialize(&mut buffer_lender, serializer, heap)?;
|
||||
recorder.serialize(serializer, heap)?;
|
||||
serializer.close_term()?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,51 +1,10 @@
|
||||
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
|
||||
use common::{read_u32_vint, write_u32_vint};
|
||||
use postings::FieldSerializer;
|
||||
use std::io;
|
||||
use std::{self, io};
|
||||
use DocId;
|
||||
|
||||
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
|
||||
const POSITION_END: u32 = 0;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct BufferLender {
|
||||
buffer_u8: Vec<u8>,
|
||||
buffer_u32: Vec<u32>,
|
||||
}
|
||||
|
||||
impl BufferLender {
|
||||
pub fn lend_u8(&mut self) -> &mut Vec<u8> {
|
||||
self.buffer_u8.clear();
|
||||
&mut self.buffer_u8
|
||||
}
|
||||
pub fn lend_all(&mut self) -> (&mut Vec<u8>, &mut Vec<u32>) {
|
||||
self.buffer_u8.clear();
|
||||
self.buffer_u32.clear();
|
||||
(&mut self.buffer_u8, &mut self.buffer_u32)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VInt32Reader<'a> {
|
||||
data: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> VInt32Reader<'a> {
|
||||
fn new(data: &'a [u8]) -> VInt32Reader<'a> {
|
||||
VInt32Reader { data }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for VInt32Reader<'a> {
|
||||
type Item = u32;
|
||||
|
||||
fn next(&mut self) -> Option<u32> {
|
||||
if self.data.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(read_u32_vint(&mut self.data))
|
||||
}
|
||||
}
|
||||
}
|
||||
const POSITION_END: u32 = std::u32::MAX;
|
||||
|
||||
/// Recorder is in charge of recording relevant information about
|
||||
/// the presence of a term in a document.
|
||||
@@ -56,9 +15,9 @@ impl<'a> Iterator for VInt32Reader<'a> {
|
||||
/// * the document id
|
||||
/// * the term frequency
|
||||
/// * the term positions
|
||||
pub(crate) trait Recorder: Copy + 'static {
|
||||
pub trait Recorder: Copy {
|
||||
///
|
||||
fn new() -> Self;
|
||||
fn new(heap: &mut MemoryArena) -> Self;
|
||||
/// Returns the current document
|
||||
fn current_doc(&self) -> u32;
|
||||
/// Starts recording information about a new document
|
||||
@@ -70,12 +29,7 @@ pub(crate) trait Recorder: Copy + 'static {
|
||||
/// Close the document. It will help record the term frequency.
|
||||
fn close_doc(&mut self, heap: &mut MemoryArena);
|
||||
/// Pushes the postings information to the serializer.
|
||||
fn serialize(
|
||||
&self,
|
||||
buffer_lender: &mut BufferLender,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()>;
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()>;
|
||||
}
|
||||
|
||||
/// Only records the doc ids
|
||||
@@ -86,9 +40,9 @@ pub struct NothingRecorder {
|
||||
}
|
||||
|
||||
impl Recorder for NothingRecorder {
|
||||
fn new() -> Self {
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
NothingRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(),
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
current_doc: u32::max_value(),
|
||||
}
|
||||
}
|
||||
@@ -99,23 +53,16 @@ impl Recorder for NothingRecorder {
|
||||
|
||||
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
|
||||
self.current_doc = doc;
|
||||
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
|
||||
self.stack.push(doc, heap);
|
||||
}
|
||||
|
||||
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {}
|
||||
|
||||
fn close_doc(&mut self, _heap: &mut MemoryArena) {}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
buffer_lender: &mut BufferLender,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
let buffer = buffer_lender.lend_u8();
|
||||
self.stack.read_to_end(heap, buffer);
|
||||
for doc in VInt32Reader::new(&buffer[..]) {
|
||||
serializer.write_doc(doc as u32, 0u32, &EMPTY_ARRAY)?;
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
for doc in self.stack.iter(heap) {
|
||||
serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -130,9 +77,9 @@ pub struct TermFrequencyRecorder {
|
||||
}
|
||||
|
||||
impl Recorder for TermFrequencyRecorder {
|
||||
fn new() -> Self {
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
TermFrequencyRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(),
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
current_doc: u32::max_value(),
|
||||
current_tf: 0u32,
|
||||
}
|
||||
@@ -144,7 +91,7 @@ impl Recorder for TermFrequencyRecorder {
|
||||
|
||||
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
|
||||
self.current_doc = doc;
|
||||
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
|
||||
self.stack.push(doc, heap);
|
||||
}
|
||||
|
||||
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {
|
||||
@@ -153,24 +100,24 @@ impl Recorder for TermFrequencyRecorder {
|
||||
|
||||
fn close_doc(&mut self, heap: &mut MemoryArena) {
|
||||
debug_assert!(self.current_tf > 0);
|
||||
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap));
|
||||
self.stack.push(self.current_tf, heap);
|
||||
self.current_tf = 0;
|
||||
}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
buffer_lender: &mut BufferLender,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
let buffer = buffer_lender.lend_u8();
|
||||
self.stack.read_to_end(heap, buffer);
|
||||
let mut u32_it = VInt32Reader::new(&buffer[..]);
|
||||
while let Some(doc) = u32_it.next() {
|
||||
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
||||
serializer.write_doc(doc as u32, term_freq, &EMPTY_ARRAY)?;
|
||||
}
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
// the last document has not been closed...
|
||||
// its term freq is self.current_tf.
|
||||
let mut doc_iter = self
|
||||
.stack
|
||||
.iter(heap)
|
||||
.chain(Some(self.current_tf).into_iter());
|
||||
|
||||
while let Some(doc) = doc_iter.next() {
|
||||
let term_freq = doc_iter
|
||||
.next()
|
||||
.expect("The IndexWriter recorded a doc without a term freq.");
|
||||
serializer.write_doc(doc, term_freq, &EMPTY_ARRAY)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -181,10 +128,11 @@ pub struct TFAndPositionRecorder {
|
||||
stack: ExpUnrolledLinkedList,
|
||||
current_doc: DocId,
|
||||
}
|
||||
|
||||
impl Recorder for TFAndPositionRecorder {
|
||||
fn new() -> Self {
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
TFAndPositionRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(),
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
current_doc: u32::max_value(),
|
||||
}
|
||||
}
|
||||
@@ -195,88 +143,33 @@ impl Recorder for TFAndPositionRecorder {
|
||||
|
||||
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
|
||||
self.current_doc = doc;
|
||||
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
|
||||
self.stack.push(doc, heap);
|
||||
}
|
||||
|
||||
fn record_position(&mut self, position: u32, heap: &mut MemoryArena) {
|
||||
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap));
|
||||
self.stack.push(position, heap);
|
||||
}
|
||||
|
||||
fn close_doc(&mut self, heap: &mut MemoryArena) {
|
||||
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap));
|
||||
self.stack.push(POSITION_END, heap);
|
||||
}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
buffer_lender: &mut BufferLender,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
let (buffer_u8, buffer_positions) = buffer_lender.lend_all();
|
||||
self.stack.read_to_end(heap, buffer_u8);
|
||||
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
|
||||
while let Some(doc) = u32_it.next() {
|
||||
let mut prev_position_plus_one = 1u32;
|
||||
buffer_positions.clear();
|
||||
loop {
|
||||
match u32_it.next() {
|
||||
Some(POSITION_END) | None => {
|
||||
break;
|
||||
}
|
||||
Some(position_plus_one) => {
|
||||
let delta_position = position_plus_one - prev_position_plus_one;
|
||||
buffer_positions.push(delta_position);
|
||||
prev_position_plus_one = position_plus_one;
|
||||
}
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
let mut doc_positions = Vec::with_capacity(100);
|
||||
let mut positions_iter = self.stack.iter(heap);
|
||||
while let Some(doc) = positions_iter.next() {
|
||||
let mut prev_position = 0;
|
||||
doc_positions.clear();
|
||||
for position in &mut positions_iter {
|
||||
if position == POSITION_END {
|
||||
break;
|
||||
} else {
|
||||
doc_positions.push(position - prev_position);
|
||||
prev_position = position;
|
||||
}
|
||||
}
|
||||
serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions)?;
|
||||
serializer.write_doc(doc, doc_positions.len() as u32, &doc_positions)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::write_u32_vint;
|
||||
use super::BufferLender;
|
||||
use super::VInt32Reader;
|
||||
|
||||
#[test]
|
||||
fn test_buffer_lender() {
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
{
|
||||
let buf = buffer_lender.lend_u8();
|
||||
assert!(buf.is_empty());
|
||||
buf.push(1u8);
|
||||
}
|
||||
{
|
||||
let buf = buffer_lender.lend_u8();
|
||||
assert!(buf.is_empty());
|
||||
buf.push(1u8);
|
||||
}
|
||||
{
|
||||
let (_, buf) = buffer_lender.lend_all();
|
||||
assert!(buf.is_empty());
|
||||
buf.push(1u32);
|
||||
}
|
||||
{
|
||||
let (_, buf) = buffer_lender.lend_all();
|
||||
assert!(buf.is_empty());
|
||||
buf.push(1u32);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vint_u32() {
|
||||
let mut buffer = vec![];
|
||||
let vals = [0, 1, 324_234_234, u32::max_value()];
|
||||
for &i in &vals {
|
||||
assert!(write_u32_vint(i, &mut buffer).is_ok());
|
||||
}
|
||||
assert_eq!(buffer.len(), 1 + 1 + 5 + 5);
|
||||
let res: Vec<u32> = VInt32Reader::new(&buffer[..]).collect();
|
||||
assert_eq!(&res[..], &vals[..]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,17 +124,21 @@ impl SegmentPostings {
|
||||
}
|
||||
|
||||
fn exponential_search(target: u32, arr: &[u32]) -> (usize, usize) {
|
||||
let mut start = 0;
|
||||
let end = arr.len();
|
||||
debug_assert!(arr.len() <= 128);
|
||||
debug_assert!(target <= arr[end - 1]);
|
||||
let mut begin = 0;
|
||||
for &pivot in [1,3,7,15,31,63].iter().take_while(|&&el| el < end) {
|
||||
if arr[pivot] > target {
|
||||
return (begin, pivot);
|
||||
let mut jump = 1;
|
||||
loop {
|
||||
let new = start + jump;
|
||||
if new >= end {
|
||||
return (start, end);
|
||||
}
|
||||
begin = pivot;
|
||||
if arr[new] > target {
|
||||
return (start, new);
|
||||
}
|
||||
start = new;
|
||||
jump *= 2;
|
||||
}
|
||||
(begin, end)
|
||||
}
|
||||
|
||||
/// Search the first index containing an element greater or equal to the target.
|
||||
@@ -748,7 +752,7 @@ mod tests {
|
||||
let int_field = schema_builder.add_u64_field("id", INT_INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
let mut last_doc = 0u32;
|
||||
for &doc in docs {
|
||||
for _ in last_doc..doc {
|
||||
@@ -819,7 +823,7 @@ mod tests {
|
||||
let int_field = schema_builder.add_u64_field("id", INT_INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
// create two postings list, one containg even number,
|
||||
// the other containing odd numbers.
|
||||
for i in 0..6 {
|
||||
|
||||
@@ -1,37 +1,28 @@
|
||||
use super::{Addr, MemoryArena};
|
||||
|
||||
use postings::stacker::memory_arena::load;
|
||||
use postings::stacker::memory_arena::store;
|
||||
use std::io;
|
||||
use common::is_power_of_2;
|
||||
use std::mem;
|
||||
|
||||
const MAX_BLOCK_LEN: u32 = 1u32 << 15;
|
||||
const FIRST_BLOCK: usize = 16;
|
||||
const INLINED_BLOCK_LEN: usize = FIRST_BLOCK + mem::size_of::<Addr>();
|
||||
|
||||
enum CapacityResult {
|
||||
Available(u32),
|
||||
NeedAlloc(u32),
|
||||
}
|
||||
const FIRST_BLOCK: u32 = 4u32;
|
||||
|
||||
fn len_to_capacity(len: u32) -> CapacityResult {
|
||||
#[inline]
|
||||
pub fn jump_needed(len: u32) -> Option<usize> {
|
||||
match len {
|
||||
0...15 => CapacityResult::Available(FIRST_BLOCK as u32 - len),
|
||||
16...MAX_BLOCK_LEN => {
|
||||
let cap = 1 << (32u32 - (len - 1u32).leading_zeros());
|
||||
let available = cap - len;
|
||||
if available == 0 {
|
||||
CapacityResult::NeedAlloc(len)
|
||||
0...3 => None,
|
||||
4...MAX_BLOCK_LEN => {
|
||||
if is_power_of_2(len as usize) {
|
||||
Some(len as usize)
|
||||
} else {
|
||||
CapacityResult::Available(available)
|
||||
None
|
||||
}
|
||||
}
|
||||
n => {
|
||||
let available = n % MAX_BLOCK_LEN;
|
||||
if available == 0 {
|
||||
CapacityResult::NeedAlloc(MAX_BLOCK_LEN)
|
||||
if n % MAX_BLOCK_LEN == 0 {
|
||||
Some(MAX_BLOCK_LEN as usize)
|
||||
} else {
|
||||
CapacityResult::Available(MAX_BLOCK_LEN - available)
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,119 +52,82 @@ fn len_to_capacity(len: u32) -> CapacityResult {
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ExpUnrolledLinkedList {
|
||||
len: u32,
|
||||
head: Addr,
|
||||
tail: Addr,
|
||||
inlined_data: [u8; INLINED_BLOCK_LEN as usize],
|
||||
}
|
||||
|
||||
pub struct ExpUnrolledLinkedListWriter<'a> {
|
||||
eull: &'a mut ExpUnrolledLinkedList,
|
||||
heap: &'a mut MemoryArena,
|
||||
}
|
||||
|
||||
fn ensure_capacity<'a>(
|
||||
eull: &'a mut ExpUnrolledLinkedList,
|
||||
heap: &'a mut MemoryArena,
|
||||
) -> &'a mut [u8] {
|
||||
if eull.len <= FIRST_BLOCK as u32 {
|
||||
// We are still hitting the inline block.
|
||||
if eull.len < FIRST_BLOCK as u32 {
|
||||
return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK];
|
||||
}
|
||||
// We need to allocate a new block!
|
||||
let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::<Addr>());
|
||||
store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr);
|
||||
eull.tail = new_block_addr;
|
||||
return heap.slice_mut(eull.tail, FIRST_BLOCK);
|
||||
}
|
||||
let len = match len_to_capacity(eull.len) {
|
||||
CapacityResult::NeedAlloc(new_block_len) => {
|
||||
let new_block_addr: Addr =
|
||||
heap.allocate_space(new_block_len as usize + mem::size_of::<Addr>());
|
||||
heap.write_at(eull.tail, new_block_addr);
|
||||
eull.tail = new_block_addr;
|
||||
new_block_len
|
||||
}
|
||||
CapacityResult::Available(available) => available,
|
||||
};
|
||||
heap.slice_mut(eull.tail, len as usize)
|
||||
}
|
||||
|
||||
impl<'a> ExpUnrolledLinkedListWriter<'a> {
|
||||
pub fn extend_from_slice(&mut self, mut buf: &[u8]) {
|
||||
if buf.is_empty() {
|
||||
// we need to cut early, because `ensure_capacity`
|
||||
// allocates if there is no capacity at all right now.
|
||||
return;
|
||||
}
|
||||
while !buf.is_empty() {
|
||||
let add_len: usize;
|
||||
{
|
||||
let output_buf = ensure_capacity(self.eull, self.heap);
|
||||
add_len = buf.len().min(output_buf.len());
|
||||
output_buf[..add_len].copy_from_slice(&buf[..add_len]);
|
||||
}
|
||||
self.eull.len += add_len as u32;
|
||||
self.eull.tail = self.eull.tail.offset(add_len as u32);
|
||||
buf = &buf[add_len..];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
// There is no use case to only write the capacity.
|
||||
// This is not IO after all, so we write the whole
|
||||
// buffer even if the contract of `.write` is looser.
|
||||
self.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
|
||||
self.extend_from_slice(buf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ExpUnrolledLinkedList {
|
||||
pub fn new() -> ExpUnrolledLinkedList {
|
||||
pub fn new(heap: &mut MemoryArena) -> ExpUnrolledLinkedList {
|
||||
let addr = heap.allocate_space((FIRST_BLOCK as usize) * mem::size_of::<u32>());
|
||||
ExpUnrolledLinkedList {
|
||||
len: 0u32,
|
||||
tail: Addr::null_pointer(),
|
||||
inlined_data: [0u8; INLINED_BLOCK_LEN as usize],
|
||||
head: addr,
|
||||
tail: addr,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
|
||||
ExpUnrolledLinkedListWriter { eull: self, heap }
|
||||
pub fn iter<'a>(&self, heap: &'a MemoryArena) -> ExpUnrolledLinkedListIterator<'a> {
|
||||
ExpUnrolledLinkedListIterator {
|
||||
heap,
|
||||
addr: self.head,
|
||||
len: self.len,
|
||||
consumed: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec<u8>) {
|
||||
let len = self.len as usize;
|
||||
if len <= FIRST_BLOCK {
|
||||
output.extend_from_slice(&self.inlined_data[..len]);
|
||||
return;
|
||||
/// Appends a new element to the current stack.
|
||||
///
|
||||
/// If the current block end is reached, a new block is allocated.
|
||||
pub fn push(&mut self, val: u32, heap: &mut MemoryArena) {
|
||||
self.len += 1;
|
||||
if let Some(new_block_len) = jump_needed(self.len) {
|
||||
// We need to allocate another block.
|
||||
// We also allocate an extra `u32` to store the pointer
|
||||
// to the future next block.
|
||||
let new_block_size: usize = (new_block_len + 1) * mem::size_of::<u32>();
|
||||
let new_block_addr: Addr = heap.allocate_space(new_block_size);
|
||||
unsafe {
|
||||
// logic
|
||||
heap.write(self.tail, new_block_addr)
|
||||
};
|
||||
self.tail = new_block_addr;
|
||||
}
|
||||
output.extend_from_slice(&self.inlined_data[..FIRST_BLOCK]);
|
||||
let mut cur = FIRST_BLOCK;
|
||||
let mut addr = load(&self.inlined_data[FIRST_BLOCK..]);
|
||||
loop {
|
||||
let cap = match len_to_capacity(cur as u32) {
|
||||
CapacityResult::Available(capacity) => capacity,
|
||||
CapacityResult::NeedAlloc(capacity) => capacity,
|
||||
} as usize;
|
||||
let data = heap.slice(addr, cap);
|
||||
if cur + cap >= len {
|
||||
output.extend_from_slice(&data[..(len - cur)]);
|
||||
return;
|
||||
}
|
||||
output.extend_from_slice(data);
|
||||
cur += cap;
|
||||
addr = heap.read(addr.offset(cap as u32));
|
||||
unsafe {
|
||||
// logic
|
||||
heap.write(self.tail, val);
|
||||
self.tail = self.tail.offset(mem::size_of::<u32>() as u32);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExpUnrolledLinkedListIterator<'a> {
|
||||
heap: &'a MemoryArena,
|
||||
addr: Addr,
|
||||
len: u32,
|
||||
consumed: u32,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
|
||||
type Item = u32;
|
||||
|
||||
fn next(&mut self) -> Option<u32> {
|
||||
if self.consumed == self.len {
|
||||
None
|
||||
} else {
|
||||
self.consumed += 1;
|
||||
let addr: Addr = if jump_needed(self.consumed).is_some() {
|
||||
unsafe {
|
||||
// logic
|
||||
self.heap.read(self.addr)
|
||||
}
|
||||
} else {
|
||||
self.addr
|
||||
};
|
||||
self.addr = addr.offset(mem::size_of::<u32>() as u32);
|
||||
Some(unsafe {
|
||||
// logic
|
||||
self.heap.read(addr)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -182,134 +136,46 @@ impl ExpUnrolledLinkedList {
|
||||
mod tests {
|
||||
|
||||
use super::super::MemoryArena;
|
||||
use super::len_to_capacity;
|
||||
use super::jump_needed;
|
||||
use super::*;
|
||||
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
|
||||
|
||||
#[test]
|
||||
#[test]
|
||||
fn test_stack() {
|
||||
let mut heap = MemoryArena::new();
|
||||
let mut stack = ExpUnrolledLinkedList::new();
|
||||
stack.writer(&mut heap).extend_from_slice(&[1u8]);
|
||||
stack.writer(&mut heap).extend_from_slice(&[2u8]);
|
||||
stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]);
|
||||
stack.writer(&mut heap).extend_from_slice(&[5u8]);
|
||||
let mut stack = ExpUnrolledLinkedList::new(&mut heap);
|
||||
stack.push(1u32, &mut heap);
|
||||
stack.push(2u32, &mut heap);
|
||||
stack.push(4u32, &mut heap);
|
||||
stack.push(8u32, &mut heap);
|
||||
{
|
||||
let mut buffer = Vec::new();
|
||||
stack.read_to_end(&heap, &mut buffer);
|
||||
assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]);
|
||||
let mut it = stack.iter(&heap);
|
||||
assert_eq!(it.next().unwrap(), 1u32);
|
||||
assert_eq!(it.next().unwrap(), 2u32);
|
||||
assert_eq!(it.next().unwrap(), 4u32);
|
||||
assert_eq!(it.next().unwrap(), 8u32);
|
||||
assert!(it.next().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stack_long() {
|
||||
let mut heap = MemoryArena::new();
|
||||
let mut stack = ExpUnrolledLinkedList::new();
|
||||
let source: Vec<u32> = (0..100).collect();
|
||||
for &el in &source {
|
||||
assert!(stack
|
||||
.writer(&mut heap)
|
||||
.write_u32::<LittleEndian>(el)
|
||||
.is_ok());
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
stack.read_to_end(&heap, &mut buffer);
|
||||
let mut result = vec![];
|
||||
let mut remaining = &buffer[..];
|
||||
while !remaining.is_empty() {
|
||||
result.push(LittleEndian::read_u32(&remaining[..4]));
|
||||
remaining = &remaining[4..];
|
||||
}
|
||||
assert_eq!(&result[..], &source[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stack_interlaced() {
|
||||
let mut heap = MemoryArena::new();
|
||||
let mut stack = ExpUnrolledLinkedList::new();
|
||||
let mut stack2 = ExpUnrolledLinkedList::new();
|
||||
|
||||
let mut vec1: Vec<u8> = vec![];
|
||||
let mut vec2: Vec<u8> = vec![];
|
||||
|
||||
for i in 0..9 {
|
||||
assert!(stack.writer(&mut heap).write_u32::<LittleEndian>(i).is_ok());
|
||||
assert!(vec1.write_u32::<LittleEndian>(i).is_ok());
|
||||
if i % 2 == 0 {
|
||||
assert!(stack2
|
||||
.writer(&mut heap)
|
||||
.write_u32::<LittleEndian>(i)
|
||||
.is_ok());
|
||||
assert!(vec2.write_u32::<LittleEndian>(i).is_ok());
|
||||
}
|
||||
}
|
||||
let mut res1 = vec![];
|
||||
let mut res2 = vec![];
|
||||
stack.read_to_end(&heap, &mut res1);
|
||||
stack2.read_to_end(&heap, &mut res2);
|
||||
assert_eq!(&vec1[..], &res1[..]);
|
||||
assert_eq!(&vec2[..], &res2[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_jump_if_needed() {
|
||||
let mut available = 16u32;
|
||||
for i in 0..10_000_000 {
|
||||
match len_to_capacity(i) {
|
||||
CapacityResult::NeedAlloc(cap) => {
|
||||
assert_eq!(available, 0, "Failed len={}: Expected 0 got {}", i, cap);
|
||||
available = cap;
|
||||
}
|
||||
CapacityResult::Available(cap) => {
|
||||
assert_eq!(
|
||||
available, cap,
|
||||
"Failed len={}: Expected {} Got {}",
|
||||
i, available, cap
|
||||
);
|
||||
}
|
||||
}
|
||||
available -= 1;
|
||||
let mut block_len = 4u32;
|
||||
let mut i = 0;
|
||||
while i < 10_000_000 {
|
||||
assert!(jump_needed(i + block_len - 1).is_none());
|
||||
assert!(jump_needed(i + block_len + 1).is_none());
|
||||
assert!(jump_needed(i + block_len).is_some());
|
||||
let new_block_len = jump_needed(i + block_len).unwrap();
|
||||
i += block_len;
|
||||
block_len = new_block_len as u32;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_jump_if_needed_progression() {
|
||||
let mut v = vec![];
|
||||
for i in 0.. {
|
||||
if v.len() >= 10 {
|
||||
break;
|
||||
}
|
||||
match len_to_capacity(i) {
|
||||
CapacityResult::NeedAlloc(cap) => {
|
||||
v.push((i, cap));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
&v[..],
|
||||
&[
|
||||
(16, 16),
|
||||
(32, 32),
|
||||
(64, 64),
|
||||
(128, 128),
|
||||
(256, 256),
|
||||
(512, 512),
|
||||
(1024, 1024),
|
||||
(2048, 2048),
|
||||
(4096, 4096),
|
||||
(8192, 8192)
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
use super::super::MemoryArena;
|
||||
use super::ExpUnrolledLinkedList;
|
||||
use byteorder::{NativeEndian, WriteBytesExt};
|
||||
use test::Bencher;
|
||||
|
||||
const NUM_STACK: usize = 10_000;
|
||||
@@ -337,13 +203,13 @@ mod bench {
|
||||
let mut heap = MemoryArena::new();
|
||||
let mut stacks = Vec::with_capacity(100);
|
||||
for _ in 0..NUM_STACK {
|
||||
let mut stack = ExpUnrolledLinkedList::new();
|
||||
let mut stack = ExpUnrolledLinkedList::new(&mut heap);
|
||||
stacks.push(stack);
|
||||
}
|
||||
for s in 0..NUM_STACK {
|
||||
for i in 0u32..STACK_SIZE {
|
||||
let t = s * 392017 % NUM_STACK;
|
||||
let _ = stacks[t].writer(&mut heap).write_u32::<NativeEndian>(i);
|
||||
stacks[t].push(i, &mut heap);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -37,7 +37,7 @@ const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
|
||||
/// page of memory.
|
||||
///
|
||||
/// The last 20 bits are an address within this page of memory.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct Addr(u32);
|
||||
|
||||
impl Addr {
|
||||
@@ -69,16 +69,32 @@ impl Addr {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
|
||||
assert_eq!(dest.len(), std::mem::size_of::<Item>());
|
||||
unsafe {
|
||||
ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val);
|
||||
}
|
||||
/// Trait required for an object to be `storable`.
|
||||
///
|
||||
/// # Warning
|
||||
///
|
||||
/// Most of the time you should not implement this trait,
|
||||
/// and only use the `MemoryArena` with object implementing `Copy`.
|
||||
///
|
||||
/// `ArenaStorable` is used in `tantivy` to force
|
||||
/// a `Copy` object and a `slice` of data to be stored contiguously.
|
||||
pub trait ArenaStorable {
|
||||
fn num_bytes(&self) -> usize;
|
||||
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr);
|
||||
}
|
||||
|
||||
pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
|
||||
assert_eq!(data.len(), std::mem::size_of::<Item>());
|
||||
unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) }
|
||||
impl<V> ArenaStorable for V
|
||||
where
|
||||
V: Copy,
|
||||
{
|
||||
fn num_bytes(&self) -> usize {
|
||||
mem::size_of::<V>()
|
||||
}
|
||||
|
||||
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr) {
|
||||
let dst_ptr = arena.get_mut_ptr(addr) as *mut V;
|
||||
ptr::write_unaligned(dst_ptr, self);
|
||||
}
|
||||
}
|
||||
|
||||
/// The `MemoryArena`
|
||||
@@ -110,9 +126,47 @@ impl MemoryArena {
|
||||
self.pages.len() * PAGE_SIZE
|
||||
}
|
||||
|
||||
pub fn write_at<Item: Copy + 'static>(&mut self, addr: Addr, val: Item) {
|
||||
let dest = self.slice_mut(addr, std::mem::size_of::<Item>());
|
||||
store(dest, val);
|
||||
/// Writes a slice at the given address, assuming the
|
||||
/// memory was allocated beforehands.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic or corrupt the heap if he space was not
|
||||
/// properly allocated beforehands.
|
||||
pub fn write_bytes<B: AsRef<[u8]>>(&mut self, addr: Addr, data: B) {
|
||||
let bytes = data.as_ref();
|
||||
self.pages[addr.page_id()]
|
||||
.get_mut_slice(addr.page_local_addr(), bytes.len())
|
||||
.copy_from_slice(bytes);
|
||||
}
|
||||
|
||||
/// Returns the `len` bytes starting at `addr`
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the memory has not been allocated beforehands.
|
||||
pub fn read_slice(&self, addr: Addr, len: usize) -> &[u8] {
|
||||
self.pages[addr.page_id()].get_slice(addr.page_local_addr(), len)
|
||||
}
|
||||
|
||||
unsafe fn get_mut_ptr(&mut self, addr: Addr) -> *mut u8 {
|
||||
self.pages[addr.page_id()].get_mut_ptr(addr.page_local_addr())
|
||||
}
|
||||
|
||||
/// Stores an item's data in the heap
|
||||
///
|
||||
/// It allocates the `Item` beforehands.
|
||||
pub fn store<Item: ArenaStorable>(&mut self, val: Item) -> Addr {
|
||||
let num_bytes = val.num_bytes();
|
||||
let addr = self.allocate_space(num_bytes);
|
||||
unsafe {
|
||||
self.write(addr, val);
|
||||
};
|
||||
addr
|
||||
}
|
||||
|
||||
pub unsafe fn write<Item: ArenaStorable>(&mut self, addr: Addr, val: Item) {
|
||||
val.write_into(self, addr)
|
||||
}
|
||||
|
||||
/// Read an item in the heap at the given `address`.
|
||||
@@ -120,21 +174,9 @@ impl MemoryArena {
|
||||
/// # Panics
|
||||
///
|
||||
/// If the address is erroneous
|
||||
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
|
||||
load(self.slice(addr, mem::size_of::<Item>()))
|
||||
}
|
||||
|
||||
pub fn slice(&self, addr: Addr, len: usize) -> &[u8] {
|
||||
self.pages[addr.page_id()].slice(addr.page_local_addr(), len)
|
||||
}
|
||||
|
||||
pub fn slice_from(&self, addr: Addr) -> &[u8] {
|
||||
self.pages[addr.page_id()].slice_from(addr.page_local_addr())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] {
|
||||
self.pages[addr.page_id()].slice_mut(addr.page_local_addr(), len)
|
||||
pub unsafe fn read<Item: Copy>(&self, addr: Addr) -> Item {
|
||||
let ptr = self.pages[addr.page_id()].get_ptr(addr.page_local_addr());
|
||||
ptr::read_unaligned(ptr as *const Item)
|
||||
}
|
||||
|
||||
/// Allocates `len` bytes and returns the allocated address.
|
||||
@@ -155,10 +197,14 @@ struct Page {
|
||||
|
||||
impl Page {
|
||||
fn new(page_id: usize) -> Page {
|
||||
let mut data: Vec<u8> = Vec::with_capacity(PAGE_SIZE);
|
||||
unsafe {
|
||||
data.set_len(PAGE_SIZE);
|
||||
} // avoid initializing page
|
||||
Page {
|
||||
page_id,
|
||||
len: 0,
|
||||
data: vec![0u8; PAGE_SIZE].into_boxed_slice(),
|
||||
data: data.into_boxed_slice(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,18 +213,14 @@ impl Page {
|
||||
len + self.len <= PAGE_SIZE
|
||||
}
|
||||
|
||||
fn slice(&self, local_addr: usize, len: usize) -> &[u8] {
|
||||
&self.slice_from(local_addr)[..len]
|
||||
}
|
||||
|
||||
fn slice_from(&self, local_addr: usize) -> &[u8] {
|
||||
&self.data[local_addr..]
|
||||
}
|
||||
|
||||
fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] {
|
||||
fn get_mut_slice(&mut self, local_addr: usize, len: usize) -> &mut [u8] {
|
||||
&mut self.data[local_addr..][..len]
|
||||
}
|
||||
|
||||
fn get_slice(&self, local_addr: usize, len: usize) -> &[u8] {
|
||||
&self.data[local_addr..][..len]
|
||||
}
|
||||
|
||||
fn allocate_space(&mut self, len: usize) -> Option<Addr> {
|
||||
if self.is_available(len) {
|
||||
let addr = Addr::new(self.page_id, self.len);
|
||||
@@ -188,6 +230,16 @@ impl Page {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) unsafe fn get_ptr(&self, addr: usize) -> *const u8 {
|
||||
self.data.as_ptr().add(addr)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) unsafe fn get_mut_ptr(&mut self, addr: usize) -> *mut u8 {
|
||||
self.data.as_mut_ptr().add(addr)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -202,13 +254,13 @@ mod tests {
|
||||
let b = b"happy tax payer";
|
||||
|
||||
let addr_a = arena.allocate_space(a.len());
|
||||
arena.slice_mut(addr_a, a.len()).copy_from_slice(a);
|
||||
arena.write_bytes(addr_a, a);
|
||||
|
||||
let addr_b = arena.allocate_space(b.len());
|
||||
arena.slice_mut(addr_b, b.len()).copy_from_slice(b);
|
||||
arena.write_bytes(addr_b, b);
|
||||
|
||||
assert_eq!(arena.slice(addr_a, a.len()), a);
|
||||
assert_eq!(arena.slice(addr_b, b.len()), b);
|
||||
assert_eq!(arena.read_slice(addr_a, a.len()), a);
|
||||
assert_eq!(arena.read_slice(addr_b, b.len()), b);
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
@@ -231,15 +283,9 @@ mod tests {
|
||||
b: 221,
|
||||
c: 12,
|
||||
};
|
||||
|
||||
let num_bytes = std::mem::size_of::<MyTest>();
|
||||
let addr_a = arena.allocate_space(num_bytes);
|
||||
arena.write_at(addr_a, a);
|
||||
|
||||
let addr_b = arena.allocate_space(num_bytes);
|
||||
arena.write_at(addr_b, b);
|
||||
|
||||
assert_eq!(arena.read::<MyTest>(addr_a), a);
|
||||
assert_eq!(arena.read::<MyTest>(addr_b), b);
|
||||
let addr_a = arena.store(a);
|
||||
let addr_b = arena.store(b);
|
||||
assert_eq!(unsafe { arena.read::<MyTest>(addr_a) }, a);
|
||||
assert_eq!(unsafe { arena.read::<MyTest>(addr_b) }, b);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,5 +3,5 @@ mod memory_arena;
|
||||
mod term_hashmap;
|
||||
|
||||
pub use self::expull::ExpUnrolledLinkedList;
|
||||
pub use self::memory_arena::{Addr, MemoryArena};
|
||||
pub use self::memory_arena::{Addr, ArenaStorable, MemoryArena};
|
||||
pub use self::term_hashmap::{compute_table_size, TermHashMap};
|
||||
|
||||
@@ -2,15 +2,39 @@ extern crate murmurhash32;
|
||||
|
||||
use self::murmurhash32::murmurhash2;
|
||||
|
||||
use super::{Addr, MemoryArena};
|
||||
use byteorder::{ByteOrder, NativeEndian};
|
||||
use postings::stacker::memory_arena::store;
|
||||
use super::{Addr, ArenaStorable, MemoryArena};
|
||||
use std::iter;
|
||||
use std::mem;
|
||||
use std::slice;
|
||||
|
||||
pub type BucketId = usize;
|
||||
|
||||
struct KeyBytesValue<'a, V> {
|
||||
key: &'a [u8],
|
||||
value: V,
|
||||
}
|
||||
|
||||
impl<'a, V> KeyBytesValue<'a, V> {
|
||||
fn new(key: &'a [u8], value: V) -> KeyBytesValue<'a, V> {
|
||||
KeyBytesValue { key, value }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> ArenaStorable for KeyBytesValue<'a, V>
|
||||
where
|
||||
V: ArenaStorable,
|
||||
{
|
||||
fn num_bytes(&self) -> usize {
|
||||
0u16.num_bytes() + self.key.len() + self.value.num_bytes()
|
||||
}
|
||||
|
||||
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr) {
|
||||
arena.write(addr, self.key.len() as u16);
|
||||
arena.write_bytes(addr.offset(2), self.key);
|
||||
arena.write(addr.offset(2 + self.key.len() as u32), self.value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the actual memory size in bytes
|
||||
/// required to create a table of size $2^num_bits$.
|
||||
pub fn compute_table_size(num_bits: usize) -> usize {
|
||||
@@ -90,7 +114,8 @@ impl<'a> Iterator for Iter<'a> {
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner.next().cloned().map(move |bucket: usize| {
|
||||
let kv = self.hashmap.table[bucket];
|
||||
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
|
||||
let (key, offset): (&'a [u8], Addr) =
|
||||
unsafe { self.hashmap.get_key_value(kv.key_value_addr) };
|
||||
(key, offset, bucket as BucketId)
|
||||
})
|
||||
}
|
||||
@@ -121,22 +146,12 @@ impl TermHashMap {
|
||||
self.table.len() < self.occupied.len() * 3
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
|
||||
let data = self.heap.slice_from(addr);
|
||||
let key_bytes_len = NativeEndian::read_u16(data) as usize;
|
||||
let key_bytes: &[u8] = &data[2..][..key_bytes_len];
|
||||
(key_bytes, addr.offset(2u32 + key_bytes_len as u32))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option<Addr> {
|
||||
let (stored_key, value_addr) = self.get_key_value(addr);
|
||||
if stored_key == target_key {
|
||||
Some(value_addr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
unsafe fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
|
||||
let key_bytes_len = self.heap.read::<u16>(addr) as usize;
|
||||
let key_addr = addr.offset(2u32);
|
||||
let key_bytes: &[u8] = self.heap.read_slice(key_addr, key_bytes_len);
|
||||
let val_addr: Addr = key_addr.offset(key_bytes.len() as u32);
|
||||
(key_bytes, val_addr)
|
||||
}
|
||||
|
||||
pub fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) {
|
||||
@@ -187,7 +202,7 @@ impl TermHashMap {
|
||||
pub fn mutate_or_create<S, V, TMutator>(&mut self, key: S, mut updater: TMutator) -> BucketId
|
||||
where
|
||||
S: AsRef<[u8]>,
|
||||
V: Copy + 'static,
|
||||
V: Copy,
|
||||
TMutator: FnMut(Option<V>) -> V,
|
||||
{
|
||||
if self.is_saturated() {
|
||||
@@ -201,25 +216,22 @@ impl TermHashMap {
|
||||
let kv: KeyValue = self.table[bucket];
|
||||
if kv.is_empty() {
|
||||
let val = updater(None);
|
||||
let num_bytes =
|
||||
std::mem::size_of::<u16>() + key_bytes.len() + std::mem::size_of::<V>();
|
||||
let key_addr = self.heap.allocate_space(num_bytes);
|
||||
{
|
||||
let data = self.heap.slice_mut(key_addr, num_bytes);
|
||||
NativeEndian::write_u16(data, key_bytes.len() as u16);
|
||||
let stop = 2 + key_bytes.len();
|
||||
data[2..stop].copy_from_slice(key_bytes);
|
||||
store(&mut data[stop..], val);
|
||||
}
|
||||
let key_addr = self.heap.store(KeyBytesValue::new(key_bytes, val));
|
||||
self.set_bucket(hash, key_addr, bucket);
|
||||
return bucket as BucketId;
|
||||
} else if kv.hash == hash {
|
||||
if let Some(val_addr) =
|
||||
self.get_value_addr_if_key_match(key_bytes, kv.key_value_addr)
|
||||
{
|
||||
let v = self.heap.read(val_addr);
|
||||
let new_v = updater(Some(v));
|
||||
self.heap.write_at(val_addr, new_v);
|
||||
let (key_matches, val_addr) = {
|
||||
let (stored_key, val_addr): (&[u8], Addr) =
|
||||
unsafe { self.get_key_value(kv.key_value_addr) };
|
||||
(stored_key == key_bytes, val_addr)
|
||||
};
|
||||
if key_matches {
|
||||
unsafe {
|
||||
// logic
|
||||
let v = self.heap.read(val_addr);
|
||||
let new_v = updater(Some(v));
|
||||
self.heap.write(val_addr, new_v);
|
||||
};
|
||||
return bucket as BucketId;
|
||||
}
|
||||
}
|
||||
@@ -227,6 +239,24 @@ impl TermHashMap {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
use super::murmurhash2::murmurhash2;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn bench_murmurhash2(b: &mut Bencher) {
|
||||
let keys: [&'static str; 3] = ["wer qwe qwe qwe ", "werbq weqweqwe2 ", "weraq weqweqwe3 "];
|
||||
b.iter(|| {
|
||||
let mut s = 0;
|
||||
for &key in &keys {
|
||||
s ^= murmurhash2(key.as_bytes());
|
||||
}
|
||||
s
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -258,7 +288,10 @@ mod tests {
|
||||
let mut vanilla_hash_map = HashMap::new();
|
||||
let mut iter_values = hash_map.iter();
|
||||
while let Some((key, addr, _)) = iter_values.next() {
|
||||
let val: u32 = hash_map.heap.read(addr);
|
||||
let val: u32 = unsafe {
|
||||
// test
|
||||
hash_map.heap.read(addr)
|
||||
};
|
||||
vanilla_hash_map.insert(key.to_owned(), val);
|
||||
}
|
||||
assert_eq!(vanilla_hash_map.len(), 2);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use core::SegmentReader;
|
||||
use downcast_rs::Downcast;
|
||||
use downcast::Downcast;
|
||||
use query::intersect_scorers;
|
||||
use query::score_combiner::{DoNothingCombiner, ScoreCombiner, SumWithCoordsCombiner};
|
||||
use query::term_query::TermScorer;
|
||||
@@ -10,6 +10,7 @@ use query::RequiredOptionalScorer;
|
||||
use query::Scorer;
|
||||
use query::Union;
|
||||
use query::Weight;
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::HashMap;
|
||||
use Result;
|
||||
|
||||
@@ -24,12 +25,13 @@ where
|
||||
|
||||
{
|
||||
let is_all_term_queries = scorers.iter().all(|scorer| {
|
||||
scorer.is::<TermScorer>()
|
||||
let scorer_ref: &Scorer = scorer.borrow();
|
||||
Downcast::<TermScorer>::is_type(scorer_ref)
|
||||
});
|
||||
if is_all_term_queries {
|
||||
let scorers: Vec<TermScorer> = scorers
|
||||
.into_iter()
|
||||
.map(|scorer| *(scorer.downcast::<TermScorer>().map_err(|_| ()).unwrap() ))
|
||||
.map(|scorer| *Downcast::<TermScorer>::downcast(scorer).unwrap())
|
||||
.collect();
|
||||
let scorer: Box<Scorer> = Box::new(Union::<TermScorer, TScoreCombiner>::from(scorers));
|
||||
return scorer;
|
||||
|
||||
@@ -8,7 +8,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use collector::tests::TestCollector;
|
||||
use downcast_rs::Downcast;
|
||||
use downcast::Downcast;
|
||||
use query::score_combiner::SumWithCoordsCombiner;
|
||||
use query::term_query::TermScorer;
|
||||
use query::Intersection;
|
||||
@@ -29,7 +29,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field => "a b c");
|
||||
index_writer.add_document(doc);
|
||||
@@ -72,7 +72,7 @@ mod tests {
|
||||
let searcher = index.searcher();
|
||||
let weight = query.weight(&searcher, true).unwrap();
|
||||
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
|
||||
assert!(scorer.is::<TermScorer>());
|
||||
assert!(Downcast::<TermScorer>::is_type(&*scorer));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -84,13 +84,13 @@ mod tests {
|
||||
let query = query_parser.parse_query("+a +b +c").unwrap();
|
||||
let weight = query.weight(&searcher, true).unwrap();
|
||||
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
|
||||
assert!(scorer.is::<Intersection<TermScorer>>());
|
||||
assert!(Downcast::<Intersection<TermScorer>>::is_type(&*scorer));
|
||||
}
|
||||
{
|
||||
let query = query_parser.parse_query("+a +(b c)").unwrap();
|
||||
let weight = query.weight(&searcher, true).unwrap();
|
||||
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
|
||||
assert!(scorer.is::<Intersection<Box<Scorer>>>());
|
||||
assert!(Downcast::<Intersection<Box<Scorer>>>::is_type(&*scorer));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,16 +103,18 @@ mod tests {
|
||||
let query = query_parser.parse_query("+a b").unwrap();
|
||||
let weight = query.weight(&searcher, true).unwrap();
|
||||
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
|
||||
assert!(scorer.is::<RequiredOptionalScorer<Box<Scorer>, Box<Scorer>, SumWithCoordsCombiner>>());
|
||||
assert!(Downcast::<
|
||||
RequiredOptionalScorer<Box<Scorer>, Box<Scorer>, SumWithCoordsCombiner>,
|
||||
>::is_type(&*scorer));
|
||||
}
|
||||
{
|
||||
let query = query_parser.parse_query("+a b").unwrap();
|
||||
let weight = query.weight(&searcher, false).unwrap();
|
||||
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
|
||||
assert!(scorer.is::<TermScorer>());
|
||||
println!("{:?}", scorer.type_name());
|
||||
assert!(Downcast::<TermScorer>::is_type(&*scorer));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_boolean_query() {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use docset::{DocSet, SkipResult};
|
||||
use downcast::Downcast;
|
||||
use query::term_query::TermScorer;
|
||||
use query::EmptyScorer;
|
||||
use query::Scorer;
|
||||
use std::borrow::Borrow;
|
||||
use DocId;
|
||||
use Score;
|
||||
use query::term_query::TermScorer;
|
||||
|
||||
/// Returns the intersection scorer.
|
||||
///
|
||||
@@ -24,12 +26,13 @@ pub fn intersect_scorers(mut scorers: Vec<Box<Scorer>>) -> Box<Scorer> {
|
||||
(Some(single_docset), None) => single_docset,
|
||||
(Some(left), Some(right)) => {
|
||||
{
|
||||
let all_term_scorers = [&left, &right].iter().all(|&scorer| {
|
||||
scorer.is::<TermScorer>()
|
||||
let all_term_scorers = [&left, &right].into_iter().all(|scorer| {
|
||||
let scorer_ref: &Scorer = (*scorer).borrow();
|
||||
Downcast::<TermScorer>::is_type(scorer_ref)
|
||||
});
|
||||
if all_term_scorers {
|
||||
let left = *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap());
|
||||
let right = *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap());
|
||||
let left = *Downcast::<TermScorer>::downcast(left).unwrap();
|
||||
let right = *Downcast::<TermScorer>::downcast(right).unwrap();
|
||||
return Box::new(Intersection {
|
||||
left,
|
||||
right,
|
||||
|
||||
@@ -24,7 +24,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
for &text in texts {
|
||||
let doc = doc!(text_field=>text);
|
||||
index_writer.add_document(doc);
|
||||
@@ -86,7 +86,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a b c"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
@@ -141,7 +141,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"b"));
|
||||
index_writer.add_document(doc!(text_field=>"a b"));
|
||||
index_writer.add_document(doc!(text_field=>"b a"));
|
||||
@@ -173,7 +173,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field=>"a b c d e f g h"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::Weight;
|
||||
use core::searcher::Searcher;
|
||||
use downcast_rs;
|
||||
use downcast;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fmt;
|
||||
use Result;
|
||||
@@ -39,7 +39,7 @@ use Term;
|
||||
///
|
||||
/// When implementing a new type of `Query`, it is normal to implement a
|
||||
/// dedicated `Query`, `Weight` and `Scorer`.
|
||||
pub trait Query: QueryClone + downcast_rs::Downcast + fmt::Debug {
|
||||
pub trait Query: QueryClone + downcast::Any + fmt::Debug {
|
||||
/// Create the weight associated to a query.
|
||||
///
|
||||
/// If scoring is not required, setting `scoring_enabled` to `false`
|
||||
@@ -96,4 +96,7 @@ impl QueryClone for Box<Query> {
|
||||
}
|
||||
}
|
||||
|
||||
impl_downcast!(Query);
|
||||
#[allow(missing_docs)]
|
||||
mod downcast_impl {
|
||||
downcast!(super::Query);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use common::BitSet;
|
||||
use docset::{DocSet, SkipResult};
|
||||
use downcast_rs;
|
||||
use downcast;
|
||||
use std::ops::DerefMut;
|
||||
use DocId;
|
||||
use Score;
|
||||
@@ -8,7 +8,7 @@ use Score;
|
||||
/// Scored set of documents matching a query within a specific segment.
|
||||
///
|
||||
/// See [`Query`](./trait.Query.html).
|
||||
pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
|
||||
pub trait Scorer: downcast::Any + DocSet + 'static {
|
||||
/// Returns the score.
|
||||
///
|
||||
/// This method will perform a bit of computation and is not cached.
|
||||
@@ -23,8 +23,10 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
impl_downcast!(Scorer);
|
||||
|
||||
#[allow(missing_docs)]
|
||||
mod downcast_impl {
|
||||
downcast!(super::Scorer);
|
||||
}
|
||||
|
||||
impl Scorer for Box<Scorer> {
|
||||
fn score(&mut self) -> Score {
|
||||
|
||||
@@ -25,7 +25,7 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc!(text_field => "a");
|
||||
index_writer.add_document(doc);
|
||||
|
||||
@@ -133,7 +133,7 @@ impl<'a, T: ?Sized + AsRef<str>> From<&'a T> for Facet {
|
||||
}
|
||||
let path: &str = path_asref.as_ref();
|
||||
assert!(!path.is_empty());
|
||||
assert!(path.starts_with('/'));
|
||||
assert!(path.starts_with("/"));
|
||||
let mut facet_encoded = String::new();
|
||||
let mut state = State::Idle;
|
||||
let path_bytes = path.as_bytes();
|
||||
|
||||
@@ -523,7 +523,7 @@ Survey in 2016, 2017, and 2018."#;
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
index_writer.add_document(doc!(text_field => "a b"));
|
||||
@@ -580,7 +580,7 @@ Survey in 2016, 2017, and 2018."#;
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
let doc = doc ! (text_field => TEST_TEXT);
|
||||
index_writer.add_document(doc);
|
||||
|
||||
@@ -133,7 +133,7 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
{
|
||||
let mut doc = Document::default();
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use byteorder::ByteOrder;
|
||||
use common::bitpacker::BitPacker;
|
||||
use common::compute_num_bits;
|
||||
use common::Endianness;
|
||||
@@ -7,6 +7,7 @@ use directory::ReadOnlySource;
|
||||
use postings::TermInfo;
|
||||
use std::cmp;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ptr;
|
||||
use termdict::TermOrdinal;
|
||||
|
||||
const BLOCK_LEN: usize = 256;
|
||||
@@ -87,17 +88,13 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
|
||||
assert!(num_bits <= 56);
|
||||
let addr_byte = addr_bits / 8;
|
||||
let bit_shift = (addr_bits % 8) as u64;
|
||||
let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 {
|
||||
LittleEndian::read_u64(&data[addr_byte..][..8])
|
||||
} else {
|
||||
// the buffer is not large enough.
|
||||
// Let's copy the few remaining bytes to a 8 byte buffer
|
||||
// padded with 0s.
|
||||
let mut buf = [0u8; 8];
|
||||
let data_to_copy = &data[addr_byte..];
|
||||
let nbytes = data_to_copy.len();
|
||||
buf[..nbytes].copy_from_slice(data_to_copy);
|
||||
LittleEndian::read_u64(&buf)
|
||||
assert!(data.len() >= addr_byte + 7);
|
||||
let val_unshifted_unmasked: u64 = unsafe {
|
||||
// ok because the pointer is only accessed using `ptr::read_unaligned`
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(clippy::cast_ptr_alignment))]
|
||||
let addr = data.as_ptr().add(addr_byte) as *const u64;
|
||||
// ok thanks to the 7 byte padding
|
||||
ptr::read_unaligned(addr)
|
||||
};
|
||||
let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift;
|
||||
let mask = (1u64 << u64::from(num_bits)) - 1;
|
||||
@@ -249,6 +246,7 @@ impl TermInfoStoreWriter {
|
||||
self.num_terms.serialize(write)?;
|
||||
write.write_all(&self.buffer_block_metas)?;
|
||||
write.write_all(&self.buffer_term_infos)?;
|
||||
write.write_all(&[0u8; 7])?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ where
|
||||
self.token_mut().text.make_ascii_lowercase();
|
||||
} else {
|
||||
to_lowercase_unicode(&mut self.tail.token_mut().text, &mut self.buffer);
|
||||
|
||||
mem::swap(&mut self.tail.token_mut().text, &mut self.buffer);
|
||||
}
|
||||
true
|
||||
|
||||
@@ -73,7 +73,7 @@
|
||||
//! let en_stem = SimpleTokenizer
|
||||
//! .filter(RemoveLongFilter::limit(40))
|
||||
//! .filter(LowerCaser)
|
||||
//! .filter(Stemmer::new(Language::English));
|
||||
//! .filter(Stemmer::new());
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
@@ -148,7 +148,7 @@ pub use self::ngram_tokenizer::NgramTokenizer;
|
||||
pub use self::raw_tokenizer::RawTokenizer;
|
||||
pub use self::remove_long::RemoveLongFilter;
|
||||
pub use self::simple_tokenizer::SimpleTokenizer;
|
||||
pub use self::stemmer::{Language, Stemmer};
|
||||
pub use self::stemmer::Stemmer;
|
||||
pub use self::stop_word_filter::StopWordFilter;
|
||||
pub(crate) use self::token_stream_chain::TokenStreamChain;
|
||||
pub(crate) use self::tokenizer::box_tokenizer;
|
||||
@@ -159,10 +159,8 @@ pub use self::tokenizer_manager::TokenizerManager;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::{
|
||||
Language, LowerCaser, RemoveLongFilter, SimpleTokenizer, Stemmer, Token, Tokenizer,
|
||||
TokenizerManager,
|
||||
};
|
||||
use super::Token;
|
||||
use super::TokenizerManager;
|
||||
|
||||
/// This is a function that can be used in tests and doc tests
|
||||
/// to assert a token's correctness.
|
||||
@@ -216,7 +214,6 @@ pub mod tests {
|
||||
.token_stream("Hello, happy tax payer!")
|
||||
.process(&mut add_token);
|
||||
}
|
||||
|
||||
assert_eq!(tokens.len(), 4);
|
||||
assert_token(&tokens[0], 0, "hello", 0, 5);
|
||||
assert_token(&tokens[1], 1, "happi", 7, 12);
|
||||
@@ -224,33 +221,6 @@ pub mod tests {
|
||||
assert_token(&tokens[3], 3, "payer", 17, 22);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_non_en_tokenizer() {
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
tokenizer_manager.register(
|
||||
"es_stem",
|
||||
SimpleTokenizer
|
||||
.filter(RemoveLongFilter::limit(40))
|
||||
.filter(LowerCaser)
|
||||
.filter(Stemmer::new(Language::Spanish)),
|
||||
);
|
||||
let en_tokenizer = tokenizer_manager.get("es_stem").unwrap();
|
||||
let mut tokens: Vec<Token> = vec![];
|
||||
{
|
||||
let mut add_token = |token: &Token| {
|
||||
tokens.push(token.clone());
|
||||
};
|
||||
en_tokenizer
|
||||
.token_stream("Hola, feliz contribuyente!")
|
||||
.process(&mut add_token);
|
||||
}
|
||||
|
||||
assert_eq!(tokens.len(), 3);
|
||||
assert_token(&tokens[0], 0, "hola", 0, 4);
|
||||
assert_token(&tokens[1], 1, "feliz", 6, 11);
|
||||
assert_token(&tokens[2], 2, "contribuyent", 12, 25);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tokenizer_empty() {
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
|
||||
@@ -4,76 +4,22 @@ use super::{Token, TokenFilter, TokenStream};
|
||||
use rust_stemmers::{self, Algorithm};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Available stemmer languages.
|
||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum Language {
|
||||
Arabic,
|
||||
Danish,
|
||||
Dutch,
|
||||
English,
|
||||
Finnish,
|
||||
French,
|
||||
German,
|
||||
Hungarian,
|
||||
Italian,
|
||||
Portuguese,
|
||||
Romanian,
|
||||
Russian,
|
||||
Spanish,
|
||||
Swedish,
|
||||
Tamil,
|
||||
Turkish,
|
||||
}
|
||||
|
||||
impl Language {
|
||||
fn algorithm(self) -> Algorithm {
|
||||
use self::Language::*;
|
||||
match self {
|
||||
Arabic => Algorithm::Arabic,
|
||||
Danish => Algorithm::Danish,
|
||||
Dutch => Algorithm::Dutch,
|
||||
English => Algorithm::English,
|
||||
Finnish => Algorithm::Finnish,
|
||||
French => Algorithm::French,
|
||||
German => Algorithm::German,
|
||||
Hungarian => Algorithm::Hungarian,
|
||||
Italian => Algorithm::Italian,
|
||||
Portuguese => Algorithm::Portuguese,
|
||||
Romanian => Algorithm::Romanian,
|
||||
Russian => Algorithm::Russian,
|
||||
Spanish => Algorithm::Spanish,
|
||||
Swedish => Algorithm::Swedish,
|
||||
Tamil => Algorithm::Tamil,
|
||||
Turkish => Algorithm::Turkish,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `Stemmer` token filter. Several languages are supported, see `Language` for the available
|
||||
/// languages.
|
||||
/// Tokens are expected to be lowercased beforehand.
|
||||
/// `Stemmer` token filter. Currently only English is supported.
|
||||
/// Tokens are expected to be lowercased beforehands.
|
||||
#[derive(Clone)]
|
||||
pub struct Stemmer {
|
||||
stemmer_algorithm: Arc<Algorithm>,
|
||||
}
|
||||
|
||||
impl Stemmer {
|
||||
/// Creates a new Stemmer `TokenFilter` for a given language algorithm.
|
||||
pub fn new(language: Language) -> Stemmer {
|
||||
/// Creates a new Stemmer `TokenFilter`.
|
||||
pub fn new() -> Stemmer {
|
||||
Stemmer {
|
||||
stemmer_algorithm: Arc::new(language.algorithm()),
|
||||
stemmer_algorithm: Arc::new(Algorithm::English),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Stemmer {
|
||||
/// Creates a new Stemmer `TokenFilter` for English.
|
||||
fn default() -> Self {
|
||||
Stemmer::new(Language::English)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TailTokenStream> TokenFilter<TailTokenStream> for Stemmer
|
||||
where
|
||||
TailTokenStream: TokenStream,
|
||||
|
||||
@@ -64,7 +64,7 @@ pub trait Tokenizer<'a>: Sized + Clone {
|
||||
/// let en_stem = SimpleTokenizer
|
||||
/// .filter(RemoveLongFilter::limit(40))
|
||||
/// .filter(LowerCaser)
|
||||
/// .filter(Stemmer::default());
|
||||
/// .filter(Stemmer::new());
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use tokenizer::box_tokenizer;
|
||||
use tokenizer::stemmer::Language;
|
||||
use tokenizer::BoxedTokenizer;
|
||||
use tokenizer::LowerCaser;
|
||||
use tokenizer::RawTokenizer;
|
||||
@@ -72,7 +71,7 @@ impl Default for TokenizerManager {
|
||||
SimpleTokenizer
|
||||
.filter(RemoveLongFilter::limit(40))
|
||||
.filter(LowerCaser)
|
||||
.filter(Stemmer::new(Language::English)),
|
||||
.filter(Stemmer::new()),
|
||||
);
|
||||
manager
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user