Issue/325 (#330)

* Introducing a SegmentMea inventory.
* Depending on census=0.1
* Cargo fmt
This commit is contained in:
Paul Masurel
2018-06-30 13:11:41 +09:00
committed by GitHub
parent 1ce36bb211
commit 8ebbf6b336
40 changed files with 440 additions and 514 deletions

View File

@@ -45,6 +45,7 @@ rust-stemmers = "0.1.0"
downcast = { version="0.9" }
matches = "0.1"
bitpacking = "0.5"
census = "0.1"
fnv = "1.0.6"
[target.'cfg(windows)'.dependencies]

View File

@@ -4,7 +4,7 @@
os: Visual Studio 2015
environment:
matrix:
- channel: nightly
- channel: stable
target: x86_64-pc-windows-msvc
install:

View File

@@ -342,19 +342,16 @@ impl FacetCollector {
pub fn harvest(mut self) -> FacetCounts {
self.finalize_segment();
let collapsed_facet_ords: Vec<&[u64]> = self
.segment_counters
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_ords[..])
.collect();
let collapsed_facet_counts: Vec<&[u64]> = self
.segment_counters
let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_counts[..])
.collect();
let facet_streams = self
.segment_counters
let facet_streams = self.segment_counters
.iter()
.map(|seg_counts| seg_counts.facet_reader.facet_dict().range().into_stream())
.collect::<Vec<_>>();
@@ -405,8 +402,7 @@ impl Collector for FacetCollector {
fn collect(&mut self, doc: DocId, _: Score) {
let facet_reader: &mut FacetReader = unsafe {
&mut *self
.ff_reader
&mut *self.ff_reader
.as_ref()
.expect("collect() was called before set_segment. This should never happen.")
.get()

View File

@@ -161,13 +161,11 @@ impl Collector for TopCollector {
fn collect(&mut self, doc: DocId, score: Score) {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
let limit_doc: GlobalScoredDoc = *self
.heap
let limit_doc: GlobalScoredDoc = *self.heap
.peek()
.expect("Top collector with size 0 is forbidden");
if limit_doc.score < score {
let mut mut_head = self
.heap
let mut mut_head = self.heap
.peek_mut()
.expect("Top collector with size 0 is forbidden");
mut_head.score = score;

View File

@@ -72,8 +72,7 @@ impl<W: Write> CompositeWrite<W> {
let footer_offset = self.write.written_bytes();
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
let mut offset_fields: Vec<_> = self
.offsets
let mut offset_fields: Vec<_> = self.offsets
.iter()
.map(|(file_addr, offset)| (*offset, *file_addr))
.collect();

View File

@@ -34,8 +34,7 @@ impl BlockEncoder {
let num_bits = self.bitpacker.num_bits_sorted(offset, block);
self.output[0] = num_bits;
let written_size =
1 + self
.bitpacker
1 + self.bitpacker
.compress_sorted(offset, block, &mut self.output[1..], num_bits);
&self.output[..written_size]
}
@@ -43,8 +42,7 @@ impl BlockEncoder {
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] {
let num_bits = self.bitpacker.num_bits(block);
self.output[0] = num_bits;
let written_size = 1 + self
.bitpacker
let written_size = 1 + self.bitpacker
.compress(block, &mut self.output[1..], num_bits);
&self.output[..written_size]
}
@@ -85,8 +83,7 @@ impl BlockDecoder {
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
let num_bits = compressed_data[0];
self.output_len = COMPRESSION_BLOCK_SIZE;
1 + self
.bitpacker
1 + self.bitpacker
.decompress(&compressed_data[1..], &mut self.output, num_bits)
}

View File

@@ -42,8 +42,7 @@ impl CompressedIntStream {
// no need to read.
self.cached_next_addr
} else {
let next_addr = addr + self
.block_decoder
let next_addr = addr + self.block_decoder
.uncompress_block_unsorted(self.buffer.slice_from(addr));
self.cached_addr = addr;
self.cached_next_addr = next_addr;

View File

@@ -191,8 +191,7 @@ impl Index {
/// Returns the list of segments that are searchable
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
Ok(self
.searchable_segment_metas()?
Ok(self.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| self.segment(segment_meta))
.collect())
@@ -205,8 +204,8 @@ impl Index {
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
let segment_meta = SegmentMeta::new(SegmentId::generate_random());
create_segment(self.clone(), segment_meta)
let segment_meta = SegmentMeta::new(SegmentId::generate_random(), 0);
self.segment(segment_meta)
}
/// Return a reference to the index directory.
@@ -227,8 +226,7 @@ impl Index {
/// Returns the list of segment ids that are searchable.
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
Ok(self
.searchable_segment_metas()?
Ok(self.searchable_segment_metas()?
.iter()
.map(|segment_meta| segment_meta.id())
.collect())

View File

@@ -87,8 +87,7 @@ impl<T> Deref for LeasedItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self
.gen_item
&self.gen_item
.as_ref()
.expect("Unwrapping a leased item should never fail")
.item // unwrap is safe here
@@ -97,8 +96,7 @@ impl<T> Deref for LeasedItem<T> {
impl<T> DerefMut for LeasedItem<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self
.gen_item
&mut self.gen_item
.as_mut()
.expect("Unwrapping a mut leased item should never fail")
.item // unwrap is safe here

View File

@@ -78,8 +78,7 @@ impl Searcher {
/// Return the field searcher associated to a `Field`.
pub fn field(&self, field: Field) -> FieldSearcher {
let inv_index_readers = self
.segment_readers
let inv_index_readers = self.segment_readers
.iter()
.map(|segment_reader| segment_reader.inverted_index(field))
.collect::<Vec<_>>();
@@ -99,8 +98,7 @@ impl FieldSearcher {
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger {
let term_streamers: Vec<_> = self
.inv_index_readers
let term_streamers: Vec<_> = self.inv_index_readers
.iter()
.map(|inverted_index| inverted_index.terms().stream())
.collect();
@@ -110,8 +108,7 @@ impl FieldSearcher {
impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let segment_ids = self
.segment_readers
let segment_ids = self.segment_readers
.iter()
.map(|segment_reader| segment_reader.segment_id())
.collect::<Vec<_>>();

View File

@@ -4,7 +4,7 @@ use core::SegmentId;
use core::SegmentMeta;
use directory::error::{OpenReadError, OpenWriteError};
use directory::Directory;
use directory::{FileProtection, ReadOnlySource, WritePtr};
use directory::{ReadOnlySource, WritePtr};
use indexer::segment_serializer::SegmentSerializer;
use schema::Schema;
use std::fmt;
@@ -28,6 +28,7 @@ impl fmt::Debug for Segment {
/// Creates a new segment given an `Index` and a `SegmentId`
///
/// The function is here to make it private outside `tantivy`.
/// #[doc(hidden)]
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
Segment { index, meta }
}
@@ -49,8 +50,11 @@ impl Segment {
}
#[doc(hidden)]
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
self.meta.set_delete_meta(num_deleted_docs, opstamp);
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
}
}
/// Returns the segment's id.
@@ -66,16 +70,6 @@ impl Segment {
self.meta.relative_path(component)
}
/// Protects a specific component file from being deleted.
///
/// Returns a FileProtection object. The file is guaranteed
/// to not be garbage collected as long as this `FileProtection` object
/// lives.
pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection {
let path = self.relative_path(component);
self.index.directory().protect_file_from_delete(&path)
}
/// Open one of the component file for a *regular* read.
pub fn open_read(
&self,
@@ -105,35 +99,3 @@ pub trait SerializableSegment {
/// The number of documents in the segment.
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
}
#[cfg(test)]
mod tests {
use core::SegmentComponent;
use directory::Directory;
use schema::SchemaBuilder;
use std::collections::HashSet;
use Index;
#[test]
fn test_segment_protect_component() {
let mut index = Index::create_in_ram(SchemaBuilder::new().build());
let segment = index.new_segment();
let path = segment.relative_path(SegmentComponent::POSTINGS);
let directory = index.directory_mut();
directory.atomic_write(&*path, &vec![0u8]).unwrap();
let living_files = HashSet::new();
{
let _file_protection = segment.protect_from_delete(SegmentComponent::POSTINGS);
assert!(directory.exists(&*path));
directory.garbage_collect(|| living_files.clone());
assert!(directory.exists(&*path));
}
directory.garbage_collect(|| living_files);
assert!(!directory.exists(&*path));
}
}

View File

@@ -1,8 +1,15 @@
use super::SegmentComponent;
use census::{Inventory, TrackedObject};
use core::SegmentId;
use serde;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
lazy_static! {
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
@@ -13,32 +20,72 @@ struct DeleteMeta {
///
/// For instance the number of docs it contains,
/// how many are deleted, etc.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone)]
pub struct SegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
tracked: TrackedObject<InnerSegmentMeta>,
}
impl fmt::Debug for SegmentMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.tracked.fmt(f)
}
}
impl serde::Serialize for SegmentMeta {
fn serialize<S>(
&self,
serializer: S,
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
self.tracked.serialize(serializer)
}
}
impl<'a> serde::Deserialize<'a> for SegmentMeta {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as serde::Deserializer<'a>>::Error>
where
D: serde::Deserializer<'a>,
{
let inner = InnerSegmentMeta::deserialize(deserializer)?;
let tracked = INVENTORY.track(inner);
Ok(SegmentMeta { tracked: tracked })
}
}
impl SegmentMeta {
/// Creates a new segment meta for
/// a segment with no deletes and no documents.
pub fn new(segment_id: SegmentId) -> SegmentMeta {
SegmentMeta {
/// Lists all living `SegmentMeta` object at the time of the call.
pub fn all() -> Vec<SegmentMeta> {
INVENTORY
.list()
.into_iter()
.map(|inner| SegmentMeta { tracked: inner })
.collect::<Vec<_>>()
}
/// Creates a new `SegmentMeta` object.
#[doc(hidden)]
pub fn new(segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
let inner = InnerSegmentMeta {
segment_id,
max_doc: 0,
max_doc,
deletes: None,
};
SegmentMeta {
tracked: INVENTORY.track(inner),
}
}
/// Returns the segment id.
pub fn id(&self) -> SegmentId {
self.segment_id
self.tracked.segment_id
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.deletes
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.num_deleted_docs)
.unwrap_or(0u32)
@@ -80,7 +127,7 @@ impl SegmentMeta {
/// and all the doc ids contains in this segment
/// are exactly (0..max_doc).
pub fn max_doc(&self) -> u32 {
self.max_doc
self.tracked.max_doc
}
/// Return the number of documents in the segment.
@@ -91,25 +138,36 @@ impl SegmentMeta {
/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<u64> {
self.deletes.as_ref().map(|delete_meta| delete_meta.opstamp)
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.opstamp)
}
/// Returns true iff the segment meta contains
/// delete information.
pub fn has_deletes(&self) -> bool {
self.deletes.is_some()
self.num_deleted_docs() > 0
}
#[doc(hidden)]
pub fn set_max_doc(&mut self, max_doc: u32) {
self.max_doc = max_doc;
}
#[doc(hidden)]
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
self.deletes = Some(DeleteMeta {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
};
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
}

View File

@@ -156,13 +156,11 @@ impl SegmentReader {
&FieldType::Bytes => {}
_ => return Err(FastFieldNotAvailableError::new(field_entry)),
}
let idx_reader = self
.fast_fields_composite
let idx_reader = self.fast_fields_composite
.open_read_with_idx(field, 0)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
.map(FastFieldReader::open)?;
let values = self
.fast_fields_composite
let values = self.fast_fields_composite
.open_read_with_idx(field, 1)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
Ok(BytesFastFieldReader::open(idx_reader, values))
@@ -274,8 +272,7 @@ impl SegmentReader {
/// term dictionary associated to a specific field,
/// and opening the posting list associated to any term.
pub fn inverted_index(&self, field: Field) -> Arc<InvertedIndexReader> {
if let Some(inv_idx_reader) = self
.inv_idx_reader_cache
if let Some(inv_idx_reader) = self.inv_idx_reader_cache
.read()
.expect("Lock poisoned. This should never happen")
.get(&field)
@@ -304,13 +301,11 @@ impl SegmentReader {
let postings_source = postings_source_opt.unwrap();
let termdict_source = self
.termdict_composite
let termdict_source = self.termdict_composite
.open_read(field)
.expect("Failed to open field term dictionary in composite file. Is the field indexed");
let positions_source = self
.positions_composite
let positions_source = self.positions_composite
.open_read(field)
.expect("Index corrupted. Failed to open field positions in composite file.");

View File

@@ -173,9 +173,6 @@ pub enum DeleteError {
/// Any kind of IO error that happens when
/// interacting with the underlying IO device.
IOError(IOError),
/// The file may not be deleted because it is
/// protected.
FileProtected(PathBuf),
}
impl From<IOError> for DeleteError {
@@ -190,9 +187,6 @@ impl fmt::Display for DeleteError {
DeleteError::FileDoesNotExist(ref path) => {
write!(f, "the file '{:?}' does not exist", path)
}
DeleteError::FileProtected(ref path) => {
write!(f, "the file '{:?}' is protected and can't be deleted", path)
}
DeleteError::IOError(ref err) => {
write!(f, "an io error occurred while deleting a file: '{}'", err)
}
@@ -207,7 +201,7 @@ impl StdError for DeleteError {
fn cause(&self) -> Option<&StdError> {
match *self {
DeleteError::FileDoesNotExist(_) | DeleteError::FileProtected(_) => None,
DeleteError::FileDoesNotExist(_) => None,
DeleteError::IOError(ref err) => Some(err),
}
}

View File

@@ -3,9 +3,7 @@ use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use error::{ErrorKind, Result, ResultExt};
use serde_json;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::io;
use std::io::Write;
use std::path::{Path, PathBuf};
@@ -32,37 +30,6 @@ pub struct ManagedDirectory {
#[derive(Debug, Default)]
struct MetaInformation {
managed_paths: HashSet<PathBuf>,
protected_files: HashMap<PathBuf, usize>,
}
/// A `FileProtection` prevents the garbage collection of a file.
///
/// See `ManagedDirectory.protect_file_from_delete`.
pub struct FileProtection {
directory: ManagedDirectory,
path: PathBuf,
}
fn unprotect_file_from_delete(directory: &ManagedDirectory, path: &Path) {
let mut meta_informations_wlock = directory
.meta_informations
.write()
.expect("Managed file lock poisoned");
if let Some(counter_ref_mut) = meta_informations_wlock.protected_files.get_mut(path) {
(*counter_ref_mut) -= 1;
}
}
impl fmt::Debug for FileProtection {
fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(formatter, "FileProtectionFor({:?})", self.path)
}
}
impl Drop for FileProtection {
fn drop(&mut self) {
unprotect_file_from_delete(&self.directory, &*self.path);
}
}
/// Saves the file containing the list of existing files
@@ -89,7 +56,6 @@ impl ManagedDirectory {
directory: Box::new(directory),
meta_informations: Arc::new(RwLock::new(MetaInformation {
managed_paths: managed_files,
protected_files: HashMap::default(),
})),
})
}
@@ -117,8 +83,7 @@ impl ManagedDirectory {
let mut files_to_delete = vec![];
{
// releasing the lock as .delete() will use it too.
let meta_informations_rlock = self
.meta_informations
let meta_informations_rlock = self.meta_informations
.read()
.expect("Managed directory rlock poisoned in garbage collect.");
@@ -159,9 +124,6 @@ impl ManagedDirectory {
error!("Failed to delete {:?}", file_to_delete);
}
}
DeleteError::FileProtected(_) => {
// this is expected.
}
}
}
}
@@ -171,8 +133,7 @@ impl ManagedDirectory {
if !deleted_files.is_empty() {
// update the list of managed files by removing
// the file that were removed.
let mut meta_informations_wlock = self
.meta_informations
let mut meta_informations_wlock = self.meta_informations
.write()
.expect("Managed directory wlock poisoned (2).");
{
@@ -187,29 +148,6 @@ impl ManagedDirectory {
}
}
/// Protects a file from being garbage collected.
///
/// The method returns a `FileProtection` object.
/// The file will not be garbage collected as long as the
/// `FileProtection` object is kept alive.
pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection {
let pathbuf = path.to_owned();
{
let mut meta_informations_wlock = self
.meta_informations
.write()
.expect("Managed file lock poisoned on protect");
*meta_informations_wlock
.protected_files
.entry(pathbuf.clone())
.or_insert(0) += 1;
}
FileProtection {
directory: self.clone(),
path: pathbuf.clone(),
}
}
/// Registers a file as managed
///
/// This method must be called before the file is
@@ -218,8 +156,7 @@ impl ManagedDirectory {
/// will not lead to garbage files that will
/// never get removed.
fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> {
let mut meta_wlock = self
.meta_informations
let mut meta_wlock = self.meta_informations
.write()
.expect("Managed file lock poisoned");
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
@@ -251,17 +188,6 @@ impl Directory for ManagedDirectory {
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
{
let metas_rlock = self
.meta_informations
.read()
.expect("poisoned lock in managed directory meta");
if let Some(counter) = metas_rlock.protected_files.get(path) {
if *counter > 0 {
return Err(DeleteError::FileProtected(path.to_owned()));
}
}
}
self.directory.delete(path)
}
@@ -377,28 +303,4 @@ mod tests {
}
}
#[test]
#[cfg(feature = "mmap")]
fn test_managed_directory_protect() {
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let living_files = HashSet::new();
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::new(mmap_directory).unwrap();
managed_directory
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(*TEST_PATH1));
{
let _file_protection = managed_directory.protect_file_from_delete(*TEST_PATH1);
managed_directory.garbage_collect(|| living_files.clone());
assert!(managed_directory.exists(*TEST_PATH1));
}
managed_directory.garbage_collect(|| living_files.clone());
assert!(!managed_directory.exists(*TEST_PATH1));
}
}

View File

@@ -32,8 +32,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
}
})?;
let meta_data = file
.metadata()
let meta_data = file.metadata()
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
if meta_data.len() == 0 {
// if the file size is 0, it will not be possible
@@ -310,8 +309,7 @@ impl Directory for MmapDirectory {
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self
.sync_directory()
Ok(_) => self.sync_directory()
.map_err(|e| IOError::with_path(path.to_owned(), e).into()),
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {

View File

@@ -25,7 +25,7 @@ pub use self::read_only_source::ReadOnlySource;
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
pub(crate) use self::managed_directory::{FileProtection, ManagedDirectory};
pub(crate) use self::managed_directory::ManagedDirectory;
pub(crate) use self::read_only_source::SourceRead;
/// Synonym of Seek + Write

View File

@@ -170,8 +170,7 @@ impl Directory for RAMDirectory {
let path_buf = PathBuf::from(path);
let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
let exists = self
.fs
let exists = self.fs
.write(path_buf.clone(), &Vec::new())
.map_err(|err| IOError::with_path(path.to_owned(), err))?;

View File

@@ -41,8 +41,7 @@ pub struct DeleteBitSet {
impl DeleteBitSet {
/// Opens a delete bitset given its data source.
pub fn open(data: ReadOnlySource) -> DeleteBitSet {
let num_deleted: usize = data
.as_slice()
let num_deleted: usize = data.as_slice()
.iter()
.map(|b| b.count_ones() as usize)
.sum();

View File

@@ -56,8 +56,7 @@ impl FacetReader {
/// Given a term ordinal returns the term associated to it.
pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) {
let found_term = self
.term_dict
let found_term = self.term_dict
.ord_to_term(facet_ord as u64, output.inner_buffer_mut());
assert!(found_term, "Term ordinal {} no found.", facet_ord);
}

View File

@@ -52,8 +52,7 @@ impl DeleteQueue {
//
// Past delete operations are not accessible.
pub fn cursor(&self) -> DeleteCursor {
let last_block = self
.inner
let last_block = self.inner
.read()
.expect("Read lock poisoned when opening delete queue cursor")
.last_block
@@ -93,8 +92,7 @@ impl DeleteQueue {
// be some unflushed operations.
//
fn flush(&self) -> Option<Arc<Block>> {
let mut self_wlock = self
.inner
let mut self_wlock = self.inner
.write()
.expect("Failed to acquire write lock on delete queue writer");
@@ -134,8 +132,7 @@ impl From<DeleteQueue> for NextBlock {
impl NextBlock {
fn next_block(&self) -> Option<Arc<Block>> {
{
let next_read_lock = self
.0
let next_read_lock = self.0
.read()
.expect("Failed to acquire write lock in delete queue");
if let InnerNextBlock::Closed(ref block) = *next_read_lock {
@@ -144,8 +141,7 @@ impl NextBlock {
}
let next_block;
{
let mut next_write_lock = self
.0
let mut next_write_lock = self.0
.write()
.expect("Failed to acquire write lock in delete queue");
match *next_write_lock {

View File

@@ -9,7 +9,6 @@ use core::SegmentComponent;
use core::SegmentId;
use core::SegmentMeta;
use core::SegmentReader;
use directory::FileProtection;
use docset::DocSet;
use error::{Error, ErrorKind, Result, ResultExt};
use fastfield::write_delete_bitset;
@@ -216,15 +215,13 @@ pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: u64,
) -> Result<Option<FileProtection>> {
let mut file_protect: Option<FileProtection> = None;
) -> Result<()> {
{
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
// We are already up-to-date here.
if target_opstamp == previous_opstamp {
return Ok(file_protect);
}
return Ok(());
}
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
@@ -243,6 +240,7 @@ pub fn advance_deletes(
target_opstamp,
)?;
// TODO optimize
for doc in 0u32..max_doc {
if segment_reader.is_deleted(doc) {
delete_bitset.insert(doc as usize);
@@ -251,14 +249,13 @@ pub fn advance_deletes(
let num_deleted_docs = delete_bitset.len();
if num_deleted_docs > 0 {
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE));
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(file_protect)
segment_entry.set_meta((*segment.meta()).clone());
Ok(())
}
fn index_documents(
@@ -299,8 +296,7 @@ fn index_documents(
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let mut segment_meta = SegmentMeta::new(segment_id);
segment_meta.set_max_doc(num_docs);
let segment_meta = SegmentMeta::new(segment_id, num_docs);
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
@@ -342,8 +338,7 @@ impl IndexWriter {
}
drop(self.workers_join_handle);
let result = self
.segment_updater
let result = self.segment_updater
.wait_merging_thread()
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
@@ -448,7 +443,9 @@ impl IndexWriter {
}
/// Merges a given list of segments
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
///
/// `segment_ids` is required to be non-empty.
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
self.segment_updater.start_merge(segment_ids)
}
@@ -488,8 +485,7 @@ impl IndexWriter {
let document_receiver = self.document_receiver.clone();
// take the directory lock to create a new index_writer.
let directory_lock = self
._directory_lock
let directory_lock = self._directory_lock
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");

View File

@@ -116,15 +116,17 @@ mod tests {
assert!(result_list.is_empty());
}
fn seg_meta(num_docs: u32) -> SegmentMeta {
let mut segment_metas = SegmentMeta::new(SegmentId::generate_random());
segment_metas.set_max_doc(num_docs);
segment_metas
fn create_random_segment_meta(num_docs: u32) -> SegmentMeta {
SegmentMeta::new(SegmentId::generate_random(), num_docs)
}
#[test]
fn test_log_merge_policy_pair() {
let test_input = vec![seg_meta(10), seg_meta(10), seg_meta(10)];
let test_input = vec![
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 1);
}
@@ -137,17 +139,17 @@ mod tests {
// * one with the 3 * 1000-docs segments
// no MergeCandidate expected for the 2 * 10_000-docs segments as min_merge_size=3
let test_input = vec![
seg_meta(10),
seg_meta(10),
seg_meta(10),
seg_meta(1000),
seg_meta(1000),
seg_meta(1000),
seg_meta(10000),
seg_meta(10000),
seg_meta(10),
seg_meta(10),
seg_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(1000),
create_random_segment_meta(1000),
create_random_segment_meta(1000),
create_random_segment_meta(10000),
create_random_segment_meta(10000),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 2);
@@ -157,12 +159,12 @@ mod tests {
fn test_log_merge_policy_within_levels() {
// multiple levels all get merged correctly
let test_input = vec![
seg_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
seg_meta(11), // log2(11) = ~3.46
seg_meta(12), // log2(12) = ~3.58
seg_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
seg_meta(1000), // log2(1000) = ~9.97
seg_meta(1000),
create_random_segment_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
create_random_segment_meta(11), // log2(11) = ~3.46
create_random_segment_meta(12), // log2(12) = ~3.58
create_random_segment_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
create_random_segment_meta(1000), // log2(1000) = ~9.97
create_random_segment_meta(1000),
]; // log2(1000) = ~9.97
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 2);
@@ -171,12 +173,12 @@ mod tests {
fn test_log_merge_policy_small_segments() {
// segments under min_layer_size are merged together
let test_input = vec![
seg_meta(1),
seg_meta(1),
seg_meta(1),
seg_meta(2),
seg_meta(2),
seg_meta(2),
create_random_segment_meta(1),
create_random_segment_meta(1),
create_random_segment_meta(1),
create_random_segment_meta(2),
create_random_segment_meta(2),
create_random_segment_meta(2),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 1);

View File

@@ -440,8 +440,7 @@ impl IndexMerger {
) -> Result<Option<TermOrdinalMapping>> {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let field_readers = self
.readers
let field_readers = self.readers
.iter()
.map(|reader| reader.inverted_index(indexed_field))
.collect::<Vec<_>>();
@@ -737,6 +736,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -980,6 +980,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1076,6 +1077,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1129,6 +1131,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1219,6 +1222,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -1290,6 +1294,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -1392,6 +1397,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();

View File

@@ -2,6 +2,8 @@ use super::segment_register::SegmentRegister;
use core::SegmentId;
use core::SegmentMeta;
use core::{LOCKFILE_FILEPATH, META_FILEPATH};
use error::ErrorKind;
use error::Result as TantivyResult;
use indexer::delete_queue::DeleteCursor;
use indexer::SegmentEntry;
use std::collections::hash_set::HashSet;
@@ -64,8 +66,9 @@ impl SegmentManager {
/// Returns all of the segment entries (committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let mut segment_entries = self.read().uncommitted.segment_entries();
segment_entries.extend(self.read().committed.segment_entries());
let registers_lock = self.read();
let mut segment_entries = registers_lock.uncommitted.segment_entries();
segment_entries.extend(registers_lock.committed.segment_entries());
segment_entries
}
@@ -76,32 +79,15 @@ impl SegmentManager {
}
pub fn list_files(&self) -> HashSet<PathBuf> {
let registers_lock = self.read();
let mut files = HashSet::new();
files.insert(META_FILEPATH.clone());
files.insert(LOCKFILE_FILEPATH.clone());
let segment_metas: Vec<SegmentMeta> = registers_lock
.committed
.get_all_segments()
.into_iter()
.chain(registers_lock.uncommitted.get_all_segments().into_iter())
.chain(registers_lock.writing.iter().cloned().map(SegmentMeta::new))
.collect();
for segment_meta in segment_metas {
for segment_meta in SegmentMeta::all() {
files.extend(segment_meta.list_files());
}
files
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
let registers = self.read();
registers
.committed
.segment_entry(segment_id)
.or_else(|| registers.uncommitted.segment_entry(segment_id))
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
// and the operations cannot panic.
@@ -126,19 +112,38 @@ impl SegmentManager {
}
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) {
/// Marks a list of segments as in merge.
///
/// Returns an error if some segments are missing, or if
/// the `segment_ids` are not either all committed or all
/// uncommitted.
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
let mut registers_lock = self.write();
let mut segment_entries = vec![];
if registers_lock.uncommitted.contains_all(segment_ids) {
for segment_id in segment_ids {
registers_lock.uncommitted.start_merge(segment_id);
let segment_entry = registers_lock.uncommitted
.start_merge(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
} else if registers_lock.committed.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.committed
.start_merge(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
for segment_id in segment_ids {
registers_lock.committed.start_merge(segment_id);
}
} else {
error!("Merge operation sent for segments that are not all uncommited or commited.");
let error_msg = "Merge operation sent for segments that are not \
all uncommited or commited."
.to_string();
bail!(ErrorKind::InvalidArgument(error_msg))
}
Ok(segment_entries)
}
pub fn cancel_merge(

View File

@@ -3,8 +3,7 @@ use core::SegmentMeta;
use indexer::delete_queue::DeleteCursor;
use indexer::segment_entry::SegmentEntry;
use std::collections::HashMap;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fmt::{self, Debug, Formatter};
/// The segment register keeps track
/// of the list of segment, their size as well
@@ -39,13 +38,6 @@ impl SegmentRegister {
self.segment_states.len()
}
pub fn get_all_segments(&self) -> Vec<SegmentMeta> {
self.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
pub fn get_mergeable_segments(&self) -> Vec<SegmentMeta> {
self.segment_states
.values()
@@ -59,8 +51,7 @@ impl SegmentRegister {
}
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self
.segment_states
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.collect();
@@ -68,10 +59,6 @@ impl SegmentRegister {
segment_ids
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
}
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
segment_ids
.iter()
@@ -94,11 +81,13 @@ impl SegmentRegister {
.cancel_merge();
}
pub fn start_merge(&mut self, segment_id: &SegmentId) {
self.segment_states
.get_mut(segment_id)
.expect("Received a merge notification for a segment that is not registered")
.start_merge();
pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option<SegmentEntry> {
if let Some(segment_entry) = self.segment_states.get_mut(segment_id) {
segment_entry.start_merge();
Some(segment_entry.clone())
} else {
None
}
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
@@ -110,6 +99,11 @@ impl SegmentRegister {
}
SegmentRegister { segment_states }
}
#[cfg(test)]
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
}
}
#[cfg(test)]
@@ -138,7 +132,7 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = SegmentMeta::new(segment_id_a);
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
@@ -151,7 +145,7 @@ mod tests {
);
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
@@ -181,7 +175,7 @@ mod tests {
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}

View File

@@ -7,11 +7,11 @@ use core::SegmentMeta;
use core::SerializableSegment;
use core::META_FILEPATH;
use directory::Directory;
use directory::FileProtection;
use error::{Error, ErrorKind, Result};
use error::{Error, ErrorKind, Result, ResultExt};
use futures::oneshot;
use futures::sync::oneshot::Receiver;
use futures::Future;
use futures_cpupool::Builder as CpuPoolBuilder;
use futures_cpupool::CpuFuture;
use futures_cpupool::CpuPool;
use indexer::delete_queue::DeleteCursor;
@@ -29,8 +29,7 @@ use std::collections::HashMap;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
@@ -87,38 +86,19 @@ pub fn save_metas(
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
fn perform_merge(
segment_ids: &[SegmentId],
segment_updater: &SegmentUpdater,
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
mut merged_segment: Segment,
target_opstamp: u64,
) -> Result<SegmentEntry> {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids);
let index = &segment_updater.0.index;
// TODO add logging
let schema = index.schema();
let mut segment_entries = vec![];
let mut file_protections: Vec<FileProtection> = vec![];
for segment_id in segment_ids {
if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id)
{
let segment = index.segment(segment_entry.meta().clone());
if let Some(file_protection) =
advance_deletes(segment, &mut segment_entry, target_opstamp)?
{
file_protections.push(file_protection);
}
segment_entries.push(segment_entry);
} else {
error!("Error, had to abort merge as some of the segment is not managed anymore.");
let msg = format!(
"Segment {:?} requested for merge is not managed.",
segment_id
);
bail!(ErrorKind::InvalidArgument(msg));
}
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
@@ -135,13 +115,13 @@ fn perform_merge(
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
.chain_err(|| "Creating index serializer failed")?;
let num_docs = merger
.write(segment_serializer)
.expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
.chain_err(|| "Serializing merged index failed")?;
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
Ok(after_merge_segment_entry)
@@ -167,8 +147,12 @@ impl SegmentUpdater {
) -> Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = CpuPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
.create();
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
pool,
index,
segment_manager,
merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())),
@@ -283,69 +267,85 @@ impl SegmentUpdater {
}).wait()
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
self.0.segment_manager.start_merge(segment_ids);
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
//let future_merged_segment = */
let segment_ids_vec = segment_ids.to_vec();
self.run_async(move |segment_updater| {
segment_updater.start_merge_impl(&segment_ids_vec[..])
}).wait()?
}
// `segment_ids` is required to be non-empty.
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
let segment_updater_clone = self.clone();
let segment_entries: Vec<SegmentEntry> = self.0.segment_manager.start_merge(segment_ids)?;
let segment_ids_vec = segment_ids.to_vec();
let merging_thread_id = self.get_merging_thread_id();
info!(
"Starting merge thread #{} - {:?}",
merging_thread_id, segment_ids
);
let (merging_future_send, merging_future_recv) = oneshot();
if segment_ids.is_empty() {
return merging_future_recv;
}
let target_opstamp = self.0.stamper.stamp();
let merging_join_handle = thread::spawn(move || {
// first we need to apply deletes to our segment.
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(
&segment_ids_vec,
&segment_updater_clone,
merged_segment,
target_opstamp,
);
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// first we need to apply deletes to our segment.
let merging_join_handle = thread::Builder::new()
.name(format!("mergingthread-{}", merging_thread_id))
.spawn(move || {
// first we need to apply deletes to our segment.
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(
&segment_updater_clone.0.index,
segment_entries,
merged_segment,
target_opstamp,
);
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}
}
segment_updater_clone
.0
.merging_threads
.write()
.unwrap()
.remove(&merging_thread_id);
Ok(())
});
segment_updater_clone
.0
.merging_threads
.write()
.unwrap()
.remove(&merging_thread_id);
Ok(())
})
.expect("Failed to spawn a thread.");
self.0
.merging_threads
.write()
.unwrap()
.insert(merging_thread_id, merging_join_handle);
merging_future_recv
Ok(merging_future_recv)
}
fn consider_merge_options(&self) {
@@ -358,8 +358,18 @@ impl SegmentUpdater {
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_metas) in merge_candidates {
if let Err(e) = self.start_merge(&segment_metas).fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
match self.start_merge_impl(&segment_metas) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
}
}
Err(err) => {
warn!(
"Starting the merge failed for the following reason. This is not fatal. {}",
err
);
}
}
}
}
@@ -382,7 +392,6 @@ impl SegmentUpdater {
self.run_async(move |segment_updater| {
info!("End merge {:?}", after_merge_segment_entry.meta());
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
let mut _file_protection_opt = None;
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater
.0
@@ -393,29 +402,22 @@ impl SegmentUpdater {
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
match advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
) {
Ok(file_protection_opt_res) => {
_file_protection_opt = file_protection_opt_res;
}
Err(e) => {
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids, e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater.cancel_merge(
&before_merge_segment_ids,
after_merge_segment_entry.segment_id(),
);
return;
if let Err(e) =
advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp)
{
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids, e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater.cancel_merge(
&before_merge_segment_ids,
after_merge_segment_entry.segment_id(),
);
return;
}
}
}

View File

@@ -180,6 +180,8 @@ mod macros;
pub use error::{Error, ErrorKind, ResultExt};
extern crate census;
/// Tantivy result.
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -94,8 +94,7 @@ impl MultiFieldPostingsWriter {
&self,
serializer: &mut InvertedIndexSerializer,
) -> Result<HashMap<Field, HashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self
.term_index
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index
.iter()
.map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId))
.collect();

View File

@@ -107,8 +107,7 @@ impl Recorder for TermFrequencyRecorder {
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
// the last document has not been closed...
// its term freq is self.current_tf.
let mut doc_iter = self
.stack
let mut doc_iter = self.stack
.iter(heap)
.chain(Some(self.current_tf).into_iter());

View File

@@ -399,8 +399,7 @@ impl BlockSegmentPostings {
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
if self.num_bitpacked_blocks > 0 {
let num_consumed_bytes = self
.doc_decoder
let num_consumed_bytes = self.doc_decoder
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
self.remaining_data.advance(num_consumed_bytes);
match self.freq_reading_option {
@@ -410,8 +409,7 @@ impl BlockSegmentPostings {
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
let num_consumed_bytes = self.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref());
self.remaining_data.advance(num_consumed_bytes);
}

View File

@@ -160,8 +160,7 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let (filepos, offset) = self
.positions_serializer_opt
let (filepos, offset) = self.positions_serializer_opt
.as_ref()
.map(|positions_serializer| positions_serializer.addr())
.unwrap_or((0u64, 0u8));
@@ -273,8 +272,7 @@ impl<W: Write> PostingsSerializer<W> {
if self.doc_ids.len() == COMPRESSION_BLOCK_SIZE {
{
// encode the doc ids
let block_encoded: &[u8] = self
.block_encoder
let block_encoded: &[u8] = self.block_encoder
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
self.postings_write.write_all(block_encoded)?;
@@ -300,16 +298,14 @@ impl<W: Write> PostingsSerializer<W> {
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self
.block_encoder
let block_encoded = self.block_encoder
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
self.doc_ids.clear();
}
// ... Idem for term frequencies
if self.termfreq_enabled {
let block_encoded = self
.block_encoder
let block_encoded = self.block_encoder
.compress_vint_unsorted(&self.term_freqs[..]);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();

View File

@@ -41,8 +41,7 @@ impl From<Vec<(Occur, Box<Query>)>> for BooleanQuery {
impl Query for BooleanQuery {
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<Weight>> {
let sub_weights = self
.subqueries
let sub_weights = self.subqueries
.iter()
.map(|&(ref occur, ref subquery)| {
Ok((*occur, subquery.weight(searcher, scoring_enabled)?))

View File

@@ -1,15 +1,20 @@
use query::Occur;
use schema::Field;
use schema::Term;
use schema::Type;
use std::fmt;
use std::ops::Bound;
use schema::Type;
#[derive(Clone)]
pub enum LogicalLiteral {
Term(Term),
Phrase(Vec<Term>),
Range { field: Field, value_type: Type, lower: Bound<Term>, upper: Bound<Term> },
Range {
field: Field,
value_type: Type,
lower: Bound<Term>,
upper: Bound<Term>,
},
All,
}
@@ -59,7 +64,11 @@ impl fmt::Debug for LogicalLiteral {
match *self {
LogicalLiteral::Term(ref term) => write!(formatter, "{:?}", term),
LogicalLiteral::Phrase(ref terms) => write!(formatter, "\"{:?}\"", terms),
LogicalLiteral::Range { ref lower, ref upper, .. } => write!(formatter, "({:?} TO {:?})", lower, upper),
LogicalLiteral::Range {
ref lower,
ref upper,
..
} => write!(formatter, "({:?} TO {:?})", lower, upper),
LogicalLiteral::All => write!(formatter, "*"),
}
}

View File

@@ -4,15 +4,16 @@ use combine::*;
use query::query_parser::user_input_ast::UserInputBound;
fn field<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
(letter(), many(satisfy(|c: char| c.is_alphanumeric() || c == '_')))
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
(
letter(),
many(satisfy(|c: char| c.is_alphanumeric() || c == '_')),
).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
}
fn word<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
many1(satisfy(|c: char| c.is_alphanumeric()))
}
fn negative_number<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
(char('-'), many1(satisfy(|c: char| c.is_numeric())))
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
@@ -45,9 +46,7 @@ where
}
fn range<I: Stream<Item = char>>(input: I) -> ParseResult<UserInputAST, I> {
let term_val = || {
word().or(negative_number())
};
let term_val = || word().or(negative_number());
let lower_bound = {
let excl = (char('{'), term_val()).map(|(_, w)| UserInputBound::Exclusive(w));
let incl = (char('['), term_val()).map(|(_, w)| UserInputBound::Inclusive(w));
@@ -59,8 +58,18 @@ fn range<I: Stream<Item = char>>(input: I) -> ParseResult<UserInputAST, I> {
// TODO: this backtracking should be unnecessary
try(excl).or(incl)
};
(optional((field(), char(':')).map(|x| x.0)), lower_bound, spaces(), string("TO"), spaces(), upper_bound)
.map(|(field, lower, _, _, _, upper)| UserInputAST::Range { field, lower, upper })
(
optional((field(), char(':')).map(|x| x.0)),
lower_bound,
spaces(),
string("TO"),
spaces(),
upper_bound,
).map(|(field, lower, _, _, _, upper)| UserInputAST::Range {
field,
lower,
upper,
})
.parse_stream(input)
}

View File

@@ -2,21 +2,21 @@ use super::logical_ast::*;
use super::query_grammar::parse_to_ast;
use super::user_input_ast::*;
use core::Index;
use query::AllQuery;
use query::BooleanQuery;
use query::Occur;
use query::PhraseQuery;
use query::Query;
use query::RangeQuery;
use query::TermQuery;
use schema::IndexRecordOption;
use schema::{Field, Schema};
use schema::{FieldType, Term};
use std::borrow::Cow;
use std::num::ParseIntError;
use std::ops::Bound;
use std::str::FromStr;
use tokenizer::TokenizerManager;
use std::ops::Bound;
use query::RangeQuery;
use query::AllQuery;
use std::borrow::Cow;
/// Possible error that may happen when parsing a query.
#[derive(Debug, PartialEq, Eq)]
@@ -177,7 +177,7 @@ impl QueryParser {
fn compute_terms_for_string(
&self,
field: Field,
phrase: &str
phrase: &str,
) -> Result<Vec<Term>, QueryParserError> {
let field_entry = self.schema.get_field_entry(field);
let field_type = field_entry.field_type();
@@ -240,9 +240,7 @@ impl QueryParser {
))
}
}
FieldType::HierarchicalFacet => {
Ok(vec![Term::from_field_text(field, phrase)])
}
FieldType::HierarchicalFacet => Ok(vec![Term::from_field_text(field, phrase)]),
FieldType::Bytes => {
let field_name = self.schema.get_field_name(field).to_string();
Err(QueryParserError::FieldNotIndexed(field_name))
@@ -258,7 +256,9 @@ impl QueryParser {
let terms = self.compute_terms_for_string(field, phrase)?;
match terms.len() {
0 => Ok(None),
1 => Ok(Some(LogicalLiteral::Term(terms.into_iter().next().unwrap()))),
1 => Ok(Some(LogicalLiteral::Term(
terms.into_iter().next().unwrap(),
))),
_ => Ok(Some(LogicalLiteral::Phrase(terms))),
}
}
@@ -271,10 +271,14 @@ impl QueryParser {
}
}
fn resolve_bound(&self, field: Field, bound: &UserInputBound) -> Result<Bound<Term>, QueryParserError> {
fn resolve_bound(
&self,
field: Field,
bound: &UserInputBound,
) -> Result<Bound<Term>, QueryParserError> {
let terms = self.compute_terms_for_string(field, bound.term_str())?;
if terms.len() != 1 {
return Err(QueryParserError::RangeMustNotHavePhrase)
return Err(QueryParserError::RangeMustNotHavePhrase);
}
let term = terms.into_iter().next().unwrap();
match *bound {
@@ -283,7 +287,10 @@ impl QueryParser {
}
}
fn resolved_fields(&self, given_field: &Option<String>) -> Result<Cow<[Field]>, QueryParserError> {
fn resolved_fields(
&self,
given_field: &Option<String>,
) -> Result<Cow<[Field]>, QueryParserError> {
match *given_field {
None => {
if self.default_fields.is_empty() {
@@ -291,7 +298,7 @@ impl QueryParser {
} else {
Ok(Cow::from(&self.default_fields[..]))
}
},
}
Some(ref field) => Ok(Cow::from(vec![self.resolve_field_name(&*field)?])),
}
}
@@ -319,28 +326,41 @@ impl QueryParser {
let (occur, logical_sub_queries) = self.compute_logical_ast_with_occur(*subquery)?;
Ok((compose_occur(Occur::Must, occur), logical_sub_queries))
}
UserInputAST::Range { field, lower, upper } => {
UserInputAST::Range {
field,
lower,
upper,
} => {
let fields = self.resolved_fields(&field)?;
let mut clauses = fields.iter().map(|&field| {
let field_entry = self.schema.get_field_entry(field);
let value_type = field_entry.field_type().value_type();
Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range {
field,
value_type,
lower: self.resolve_bound(field, &lower)?,
upper: self.resolve_bound(field, &upper)?,
})))
}).collect::<Result<Vec<_>, QueryParserError>>()?;
let mut clauses = fields
.iter()
.map(|&field| {
let field_entry = self.schema.get_field_entry(field);
let value_type = field_entry.field_type().value_type();
Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range {
field,
value_type,
lower: self.resolve_bound(field, &lower)?,
upper: self.resolve_bound(field, &upper)?,
})))
})
.collect::<Result<Vec<_>, QueryParserError>>()?;
let result_ast = if clauses.len() == 1 {
clauses.pop().unwrap()
} else {
LogicalAST::Clause(clauses.into_iter().map(|clause| (Occur::Should, clause)).collect())
LogicalAST::Clause(
clauses
.into_iter()
.map(|clause| (Occur::Should, clause))
.collect(),
)
};
Ok((Occur::Should, result_ast))
}
UserInputAST::All => {
Ok((Occur::Should, LogicalAST::Leaf(Box::new(LogicalLiteral::All))))
}
UserInputAST::All => Ok((
Occur::Should,
LogicalAST::Leaf(Box::new(LogicalLiteral::All)),
)),
UserInputAST::Leaf(literal) => {
let term_phrases: Vec<(Field, String)> = match literal.field_name {
Some(ref field_name) => {
@@ -403,9 +423,12 @@ fn convert_literal_to_query(logical_literal: LogicalLiteral) -> Box<Query> {
match logical_literal {
LogicalLiteral::Term(term) => Box::new(TermQuery::new(term, IndexRecordOption::WithFreqs)),
LogicalLiteral::Phrase(terms) => Box::new(PhraseQuery::new(terms)),
LogicalLiteral::Range { field, value_type, lower, upper } => {
Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper))
},
LogicalLiteral::Range {
field,
value_type,
lower,
upper,
} => Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper)),
LogicalLiteral::All => Box::new(AllQuery),
}
}
@@ -611,11 +634,7 @@ mod test {
Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))",
false,
);
test_parse_query_to_logical_ast_helper(
"*",
"*",
false,
);
test_parse_query_to_logical_ast_helper("*", "*", false);
}
#[test]

View File

@@ -46,7 +46,11 @@ pub enum UserInputAST {
Clause(Vec<Box<UserInputAST>>),
Not(Box<UserInputAST>),
Must(Box<UserInputAST>),
Range { field: Option<String>, lower: UserInputBound, upper: UserInputBound },
Range {
field: Option<String>,
lower: UserInputBound,
upper: UserInputBound,
},
All,
Leaf(Box<UserInputLiteral>),
}
@@ -75,7 +79,11 @@ impl fmt::Debug for UserInputAST {
Ok(())
}
UserInputAST::Not(ref subquery) => write!(formatter, "-({:?})", subquery),
UserInputAST::Range { ref field, ref lower, ref upper } => {
UserInputAST::Range {
ref field,
ref lower,
ref upper,
} => {
if let &Some(ref field) = field {
write!(formatter, "{}:", field)?;
}
@@ -83,7 +91,7 @@ impl fmt::Debug for UserInputAST {
write!(formatter, " TO ")?;
upper.display_upper(formatter)?;
Ok(())
},
}
UserInputAST::All => write!(formatter, "*"),
UserInputAST::Leaf(ref subquery) => write!(formatter, "{:?}", subquery),
}

View File

@@ -97,7 +97,7 @@ impl RangeQuery {
field: Field,
value_type: Type,
left_bound: Bound<Term>,
right_bound: Bound<Term>
right_bound: Bound<Term>,
) -> RangeQuery {
let verify_and_unwrap_term = |val: &Term| {
assert_eq!(field, val.field());

View File

@@ -72,8 +72,7 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
loop {
skip_pointer = match skip_pointer {
Some((skip_doc_id, skip_offset)) => self
.get_skip_layer(layer_id)
Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?,
None => {
return Ok(());

View File

@@ -164,8 +164,7 @@ impl TermDictionary {
let fst = self.fst_index.as_fst();
let mut node = fst.root();
while ord != 0 || !node.is_final() {
if let Some(transition) = node
.transitions()
if let Some(transition) = node.transitions()
.take_while(|transition| transition.out.value() <= ord)
.last()
{