Compare commits

...

12 Commits
0.6.0 ... 0.6.1

Author SHA1 Message Date
Paul Masurel
31655e92d7 Preparing release 0.6.1 2018-07-10 09:12:26 +09:00
Paul Masurel
6b8d76685a Tiny refactoring 2018-07-05 09:11:55 +09:00
Paul Masurel
ce5683fc6a Removed useless counting_writer 2018-07-04 16:13:19 +09:00
Paul Masurel
5205579db6 Merge branch 'master' of github.com:tantivy-search/tantivy 2018-07-04 16:09:59 +09:00
Paul Masurel
d056ae60dc Removed SourceRead. Relying on the new owned-read crate instead (#332) 2018-07-04 16:08:52 +09:00
Paul Masurel
af9280c95f Removed SourceRead. Relying on the new owned-read crate instead 2018-07-04 12:47:25 +09:00
David Hewson
2e538ce6e6 remove extra space in name (#331)
the extra space that appeared breaks using the package
2018-07-02 05:32:19 +09:00
Jason Wolfe
00466d2b08 #328: Support parsing unbounded range queries (#329)
* #328: Support parsing unbounded range queries. Update CHANGELOG.md for query parser changes.

* Set version to 0.7-dev
2018-06-30 13:24:02 +09:00
Paul Masurel
8ebbf6b336 Issue/325 (#330)
* Introducing a SegmentMea inventory.
* Depending on census=0.1
* Cargo fmt
2018-06-30 13:11:41 +09:00
Paul Masurel
1ce36bb211 Merge branch 'master' of github.com:tantivy-search/tantivy 2018-06-27 16:58:47 +09:00
Jason Wolfe
2ac43bf21b Support parsing RangeQuery and AllQuery in Queryparser (#323)
* (#321) Add support for range query parsing to grammar / parser. Still needs to be wired through the rest of the way.

* (321) Finish wiring RangeQuery parsing through

* (#321) Add logical AST query parser tests for RangeQuery

* (#321) Support parsing AllQuery

* (#321) Update documentation of QueryParser

* (#321) Support negative numbers in range query parsing
2018-06-25 08:29:47 +09:00
Paul Masurel
3fd8c2aa5a Removed one keywoard 2018-06-22 14:47:21 +09:00
43 changed files with 749 additions and 591 deletions

View File

@@ -1,7 +1,18 @@
Tantivy 0.6.1
=========================
- Bugfix #324. GC removing was removing file that were still in useful
- Added support for parsing AllQuery and RangeQuery via QueryParser
- AllQuery: `*`
- RangeQuery:
- Inclusive `field:[startIncl to endIncl]`
- Exclusive `field:{startExcl to endExcl}`
- Mixed `field:[startIncl to endExcl}` and vice versa
- Unbounded `field:[start to *]`, `field:[* to end]`
Tantivy 0.6
==========================
Special thanks to @drusellers and @jason-wolfe for their contributions
to this release!

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.6.0"
version = "0.6.1"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -9,7 +9,7 @@ documentation = "https://tantivy-search.github.io/tantivy/tantivy/index.html"
homepage = "https://github.com/tantivy-search/tantivy"
repository = "https://github.com/tantivy-search/tantivy"
readme = "README.md"
keywords = ["search", "search engine", "information", "retrieval"]
keywords = ["search", "information", "retrieval"]
[dependencies]
base64 = "0.9.1"
@@ -45,7 +45,9 @@ rust-stemmers = "0.1.0"
downcast = { version="0.9" }
matches = "0.1"
bitpacking = "0.5"
census = "0.1"
fnv = "1.0.6"
owned-read = "0.1"
[target.'cfg(windows)'.dependencies]
winapi = "0.2"

View File

@@ -4,7 +4,7 @@
os: Visual Studio 2015
environment:
matrix:
- channel: nightly
- channel: stable
target: x86_64-pc-windows-msvc
install:

View File

@@ -342,19 +342,16 @@ impl FacetCollector {
pub fn harvest(mut self) -> FacetCounts {
self.finalize_segment();
let collapsed_facet_ords: Vec<&[u64]> = self
.segment_counters
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_ords[..])
.collect();
let collapsed_facet_counts: Vec<&[u64]> = self
.segment_counters
let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_counts[..])
.collect();
let facet_streams = self
.segment_counters
let facet_streams = self.segment_counters
.iter()
.map(|seg_counts| seg_counts.facet_reader.facet_dict().range().into_stream())
.collect::<Vec<_>>();
@@ -405,8 +402,7 @@ impl Collector for FacetCollector {
fn collect(&mut self, doc: DocId, _: Score) {
let facet_reader: &mut FacetReader = unsafe {
&mut *self
.ff_reader
&mut *self.ff_reader
.as_ref()
.expect("collect() was called before set_segment. This should never happen.")
.get()

View File

@@ -161,13 +161,11 @@ impl Collector for TopCollector {
fn collect(&mut self, doc: DocId, score: Score) {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
let limit_doc: GlobalScoredDoc = *self
.heap
let limit_doc: GlobalScoredDoc = *self.heap
.peek()
.expect("Top collector with size 0 is forbidden");
if limit_doc.score < score {
let mut mut_head = self
.heap
let mut mut_head = self.heap
.peek_mut()
.expect("Top collector with size 0 is forbidden");
mut_head.score = score;

View File

@@ -72,8 +72,7 @@ impl<W: Write> CompositeWrite<W> {
let footer_offset = self.write.written_bytes();
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
let mut offset_fields: Vec<_> = self
.offsets
let mut offset_fields: Vec<_> = self.offsets
.iter()
.map(|(file_addr, offset)| (*offset, *file_addr))
.collect();

View File

@@ -34,8 +34,7 @@ impl BlockEncoder {
let num_bits = self.bitpacker.num_bits_sorted(offset, block);
self.output[0] = num_bits;
let written_size =
1 + self
.bitpacker
1 + self.bitpacker
.compress_sorted(offset, block, &mut self.output[1..], num_bits);
&self.output[..written_size]
}
@@ -43,8 +42,7 @@ impl BlockEncoder {
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] {
let num_bits = self.bitpacker.num_bits(block);
self.output[0] = num_bits;
let written_size = 1 + self
.bitpacker
let written_size = 1 + self.bitpacker
.compress(block, &mut self.output[1..], num_bits);
&self.output[..written_size]
}
@@ -85,8 +83,7 @@ impl BlockDecoder {
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
let num_bits = compressed_data[0];
self.output_len = COMPRESSION_BLOCK_SIZE;
1 + self
.bitpacker
1 + self.bitpacker
.decompress(&compressed_data[1..], &mut self.output, num_bits)
}

View File

@@ -1,7 +1,8 @@
use compression::compressed_block_size;
use compression::BlockDecoder;
use compression::COMPRESSION_BLOCK_SIZE;
use directory::{ReadOnlySource, SourceRead};
use directory::ReadOnlySource;
use owned_read::OwnedRead;
/// Reads a stream of compressed ints.
///
@@ -10,7 +11,7 @@ use directory::{ReadOnlySource, SourceRead};
/// The `.skip(...)` makes it possible to avoid
/// decompressing blocks that are not required.
pub struct CompressedIntStream {
buffer: SourceRead,
buffer: OwnedRead,
block_decoder: BlockDecoder,
cached_addr: usize, // address of the currently decoded block
@@ -24,7 +25,7 @@ impl CompressedIntStream {
/// Opens a compressed int stream.
pub(crate) fn wrap(source: ReadOnlySource) -> CompressedIntStream {
CompressedIntStream {
buffer: SourceRead::from(source),
buffer: OwnedRead::new(source),
block_decoder: BlockDecoder::new(),
cached_addr: usize::max_value(),
cached_next_addr: usize::max_value(),
@@ -42,8 +43,7 @@ impl CompressedIntStream {
// no need to read.
self.cached_next_addr
} else {
let next_addr = addr + self
.block_decoder
let next_addr = addr + self.block_decoder
.uncompress_block_unsorted(self.buffer.slice_from(addr));
self.cached_addr = addr;
self.cached_next_addr = next_addr;

View File

@@ -191,8 +191,7 @@ impl Index {
/// Returns the list of segments that are searchable
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
Ok(self
.searchable_segment_metas()?
Ok(self.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| self.segment(segment_meta))
.collect())
@@ -205,8 +204,8 @@ impl Index {
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
let segment_meta = SegmentMeta::new(SegmentId::generate_random());
create_segment(self.clone(), segment_meta)
let segment_meta = SegmentMeta::new(SegmentId::generate_random(), 0);
self.segment(segment_meta)
}
/// Return a reference to the index directory.
@@ -227,8 +226,7 @@ impl Index {
/// Returns the list of segment ids that are searchable.
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
Ok(self
.searchable_segment_metas()?
Ok(self.searchable_segment_metas()?
.iter()
.map(|segment_meta| segment_meta.id())
.collect())

View File

@@ -1,6 +1,6 @@
use common::BinarySerializable;
use compression::CompressedIntStream;
use directory::{ReadOnlySource, SourceRead};
use directory::ReadOnlySource;
use postings::FreqReadingOption;
use postings::TermInfo;
use postings::{BlockSegmentPostings, SegmentPostings};
@@ -8,6 +8,7 @@ use schema::FieldType;
use schema::IndexRecordOption;
use schema::Term;
use termdict::TermDictionary;
use owned_read::OwnedRead;
/// The inverted index reader is in charge of accessing
/// the inverted index associated to a specific field.
@@ -92,7 +93,7 @@ impl InvertedIndexReader {
let offset = term_info.postings_offset as usize;
let end_source = self.postings_source.len();
let postings_slice = self.postings_source.slice(offset, end_source);
let postings_reader = SourceRead::from(postings_slice);
let postings_reader = OwnedRead::new(postings_slice);
block_postings.reset(term_info.doc_freq as usize, postings_reader);
}
@@ -114,7 +115,7 @@ impl InvertedIndexReader {
};
BlockSegmentPostings::from_data(
term_info.doc_freq as usize,
SourceRead::from(postings_data),
OwnedRead::new(postings_data),
freq_reading_option,
)
}

View File

@@ -87,8 +87,7 @@ impl<T> Deref for LeasedItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self
.gen_item
&self.gen_item
.as_ref()
.expect("Unwrapping a leased item should never fail")
.item // unwrap is safe here
@@ -97,8 +96,7 @@ impl<T> Deref for LeasedItem<T> {
impl<T> DerefMut for LeasedItem<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self
.gen_item
&mut self.gen_item
.as_mut()
.expect("Unwrapping a mut leased item should never fail")
.item // unwrap is safe here

View File

@@ -78,8 +78,7 @@ impl Searcher {
/// Return the field searcher associated to a `Field`.
pub fn field(&self, field: Field) -> FieldSearcher {
let inv_index_readers = self
.segment_readers
let inv_index_readers = self.segment_readers
.iter()
.map(|segment_reader| segment_reader.inverted_index(field))
.collect::<Vec<_>>();
@@ -99,8 +98,7 @@ impl FieldSearcher {
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger {
let term_streamers: Vec<_> = self
.inv_index_readers
let term_streamers: Vec<_> = self.inv_index_readers
.iter()
.map(|inverted_index| inverted_index.terms().stream())
.collect();
@@ -110,8 +108,7 @@ impl FieldSearcher {
impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let segment_ids = self
.segment_readers
let segment_ids = self.segment_readers
.iter()
.map(|segment_reader| segment_reader.segment_id())
.collect::<Vec<_>>();

View File

@@ -4,7 +4,7 @@ use core::SegmentId;
use core::SegmentMeta;
use directory::error::{OpenReadError, OpenWriteError};
use directory::Directory;
use directory::{FileProtection, ReadOnlySource, WritePtr};
use directory::{ReadOnlySource, WritePtr};
use indexer::segment_serializer::SegmentSerializer;
use schema::Schema;
use std::fmt;
@@ -28,6 +28,7 @@ impl fmt::Debug for Segment {
/// Creates a new segment given an `Index` and a `SegmentId`
///
/// The function is here to make it private outside `tantivy`.
/// #[doc(hidden)]
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
Segment { index, meta }
}
@@ -49,8 +50,11 @@ impl Segment {
}
#[doc(hidden)]
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
self.meta.set_delete_meta(num_deleted_docs, opstamp);
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
}
}
/// Returns the segment's id.
@@ -66,16 +70,6 @@ impl Segment {
self.meta.relative_path(component)
}
/// Protects a specific component file from being deleted.
///
/// Returns a FileProtection object. The file is guaranteed
/// to not be garbage collected as long as this `FileProtection` object
/// lives.
pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection {
let path = self.relative_path(component);
self.index.directory().protect_file_from_delete(&path)
}
/// Open one of the component file for a *regular* read.
pub fn open_read(
&self,
@@ -105,35 +99,3 @@ pub trait SerializableSegment {
/// The number of documents in the segment.
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
}
#[cfg(test)]
mod tests {
use core::SegmentComponent;
use directory::Directory;
use schema::SchemaBuilder;
use std::collections::HashSet;
use Index;
#[test]
fn test_segment_protect_component() {
let mut index = Index::create_in_ram(SchemaBuilder::new().build());
let segment = index.new_segment();
let path = segment.relative_path(SegmentComponent::POSTINGS);
let directory = index.directory_mut();
directory.atomic_write(&*path, &vec![0u8]).unwrap();
let living_files = HashSet::new();
{
let _file_protection = segment.protect_from_delete(SegmentComponent::POSTINGS);
assert!(directory.exists(&*path));
directory.garbage_collect(|| living_files.clone());
assert!(directory.exists(&*path));
}
directory.garbage_collect(|| living_files);
assert!(!directory.exists(&*path));
}
}

View File

@@ -1,8 +1,15 @@
use super::SegmentComponent;
use census::{Inventory, TrackedObject};
use core::SegmentId;
use serde;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
lazy_static! {
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
@@ -13,32 +20,72 @@ struct DeleteMeta {
///
/// For instance the number of docs it contains,
/// how many are deleted, etc.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone)]
pub struct SegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
tracked: TrackedObject<InnerSegmentMeta>,
}
impl fmt::Debug for SegmentMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.tracked.fmt(f)
}
}
impl serde::Serialize for SegmentMeta {
fn serialize<S>(
&self,
serializer: S,
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
self.tracked.serialize(serializer)
}
}
impl<'a> serde::Deserialize<'a> for SegmentMeta {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as serde::Deserializer<'a>>::Error>
where
D: serde::Deserializer<'a>,
{
let inner = InnerSegmentMeta::deserialize(deserializer)?;
let tracked = INVENTORY.track(inner);
Ok(SegmentMeta { tracked: tracked })
}
}
impl SegmentMeta {
/// Creates a new segment meta for
/// a segment with no deletes and no documents.
pub fn new(segment_id: SegmentId) -> SegmentMeta {
SegmentMeta {
/// Lists all living `SegmentMeta` object at the time of the call.
pub fn all() -> Vec<SegmentMeta> {
INVENTORY
.list()
.into_iter()
.map(|inner| SegmentMeta { tracked: inner })
.collect::<Vec<_>>()
}
/// Creates a new `SegmentMeta` object.
#[doc(hidden)]
pub fn new(segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
let inner = InnerSegmentMeta {
segment_id,
max_doc: 0,
max_doc,
deletes: None,
};
SegmentMeta {
tracked: INVENTORY.track(inner),
}
}
/// Returns the segment id.
pub fn id(&self) -> SegmentId {
self.segment_id
self.tracked.segment_id
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.deletes
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.num_deleted_docs)
.unwrap_or(0u32)
@@ -80,7 +127,7 @@ impl SegmentMeta {
/// and all the doc ids contains in this segment
/// are exactly (0..max_doc).
pub fn max_doc(&self) -> u32 {
self.max_doc
self.tracked.max_doc
}
/// Return the number of documents in the segment.
@@ -91,25 +138,36 @@ impl SegmentMeta {
/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<u64> {
self.deletes.as_ref().map(|delete_meta| delete_meta.opstamp)
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.opstamp)
}
/// Returns true iff the segment meta contains
/// delete information.
pub fn has_deletes(&self) -> bool {
self.deletes.is_some()
self.num_deleted_docs() > 0
}
#[doc(hidden)]
pub fn set_max_doc(&mut self, max_doc: u32) {
self.max_doc = max_doc;
}
#[doc(hidden)]
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
self.deletes = Some(DeleteMeta {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
};
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
}

View File

@@ -156,13 +156,11 @@ impl SegmentReader {
&FieldType::Bytes => {}
_ => return Err(FastFieldNotAvailableError::new(field_entry)),
}
let idx_reader = self
.fast_fields_composite
let idx_reader = self.fast_fields_composite
.open_read_with_idx(field, 0)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
.map(FastFieldReader::open)?;
let values = self
.fast_fields_composite
let values = self.fast_fields_composite
.open_read_with_idx(field, 1)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
Ok(BytesFastFieldReader::open(idx_reader, values))
@@ -274,8 +272,7 @@ impl SegmentReader {
/// term dictionary associated to a specific field,
/// and opening the posting list associated to any term.
pub fn inverted_index(&self, field: Field) -> Arc<InvertedIndexReader> {
if let Some(inv_idx_reader) = self
.inv_idx_reader_cache
if let Some(inv_idx_reader) = self.inv_idx_reader_cache
.read()
.expect("Lock poisoned. This should never happen")
.get(&field)
@@ -304,13 +301,11 @@ impl SegmentReader {
let postings_source = postings_source_opt.unwrap();
let termdict_source = self
.termdict_composite
let termdict_source = self.termdict_composite
.open_read(field)
.expect("Failed to open field term dictionary in composite file. Is the field indexed");
let positions_source = self
.positions_composite
let positions_source = self.positions_composite
.open_read(field)
.expect("Index corrupted. Failed to open field positions in composite file.");

View File

@@ -173,9 +173,6 @@ pub enum DeleteError {
/// Any kind of IO error that happens when
/// interacting with the underlying IO device.
IOError(IOError),
/// The file may not be deleted because it is
/// protected.
FileProtected(PathBuf),
}
impl From<IOError> for DeleteError {
@@ -190,9 +187,6 @@ impl fmt::Display for DeleteError {
DeleteError::FileDoesNotExist(ref path) => {
write!(f, "the file '{:?}' does not exist", path)
}
DeleteError::FileProtected(ref path) => {
write!(f, "the file '{:?}' is protected and can't be deleted", path)
}
DeleteError::IOError(ref err) => {
write!(f, "an io error occurred while deleting a file: '{}'", err)
}
@@ -207,7 +201,7 @@ impl StdError for DeleteError {
fn cause(&self) -> Option<&StdError> {
match *self {
DeleteError::FileDoesNotExist(_) | DeleteError::FileProtected(_) => None,
DeleteError::FileDoesNotExist(_) => None,
DeleteError::IOError(ref err) => Some(err),
}
}

View File

@@ -3,9 +3,7 @@ use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use error::{ErrorKind, Result, ResultExt};
use serde_json;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::io;
use std::io::Write;
use std::path::{Path, PathBuf};
@@ -32,37 +30,6 @@ pub struct ManagedDirectory {
#[derive(Debug, Default)]
struct MetaInformation {
managed_paths: HashSet<PathBuf>,
protected_files: HashMap<PathBuf, usize>,
}
/// A `FileProtection` prevents the garbage collection of a file.
///
/// See `ManagedDirectory.protect_file_from_delete`.
pub struct FileProtection {
directory: ManagedDirectory,
path: PathBuf,
}
fn unprotect_file_from_delete(directory: &ManagedDirectory, path: &Path) {
let mut meta_informations_wlock = directory
.meta_informations
.write()
.expect("Managed file lock poisoned");
if let Some(counter_ref_mut) = meta_informations_wlock.protected_files.get_mut(path) {
(*counter_ref_mut) -= 1;
}
}
impl fmt::Debug for FileProtection {
fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(formatter, "FileProtectionFor({:?})", self.path)
}
}
impl Drop for FileProtection {
fn drop(&mut self) {
unprotect_file_from_delete(&self.directory, &*self.path);
}
}
/// Saves the file containing the list of existing files
@@ -89,7 +56,6 @@ impl ManagedDirectory {
directory: Box::new(directory),
meta_informations: Arc::new(RwLock::new(MetaInformation {
managed_paths: managed_files,
protected_files: HashMap::default(),
})),
})
}
@@ -117,8 +83,7 @@ impl ManagedDirectory {
let mut files_to_delete = vec![];
{
// releasing the lock as .delete() will use it too.
let meta_informations_rlock = self
.meta_informations
let meta_informations_rlock = self.meta_informations
.read()
.expect("Managed directory rlock poisoned in garbage collect.");
@@ -159,9 +124,6 @@ impl ManagedDirectory {
error!("Failed to delete {:?}", file_to_delete);
}
}
DeleteError::FileProtected(_) => {
// this is expected.
}
}
}
}
@@ -171,8 +133,7 @@ impl ManagedDirectory {
if !deleted_files.is_empty() {
// update the list of managed files by removing
// the file that were removed.
let mut meta_informations_wlock = self
.meta_informations
let mut meta_informations_wlock = self.meta_informations
.write()
.expect("Managed directory wlock poisoned (2).");
{
@@ -187,29 +148,6 @@ impl ManagedDirectory {
}
}
/// Protects a file from being garbage collected.
///
/// The method returns a `FileProtection` object.
/// The file will not be garbage collected as long as the
/// `FileProtection` object is kept alive.
pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection {
let pathbuf = path.to_owned();
{
let mut meta_informations_wlock = self
.meta_informations
.write()
.expect("Managed file lock poisoned on protect");
*meta_informations_wlock
.protected_files
.entry(pathbuf.clone())
.or_insert(0) += 1;
}
FileProtection {
directory: self.clone(),
path: pathbuf.clone(),
}
}
/// Registers a file as managed
///
/// This method must be called before the file is
@@ -218,8 +156,7 @@ impl ManagedDirectory {
/// will not lead to garbage files that will
/// never get removed.
fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> {
let mut meta_wlock = self
.meta_informations
let mut meta_wlock = self.meta_informations
.write()
.expect("Managed file lock poisoned");
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
@@ -251,17 +188,6 @@ impl Directory for ManagedDirectory {
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
{
let metas_rlock = self
.meta_informations
.read()
.expect("poisoned lock in managed directory meta");
if let Some(counter) = metas_rlock.protected_files.get(path) {
if *counter > 0 {
return Err(DeleteError::FileProtected(path.to_owned()));
}
}
}
self.directory.delete(path)
}
@@ -377,28 +303,4 @@ mod tests {
}
}
#[test]
#[cfg(feature = "mmap")]
fn test_managed_directory_protect() {
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let living_files = HashSet::new();
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::new(mmap_directory).unwrap();
managed_directory
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(*TEST_PATH1));
{
let _file_protection = managed_directory.protect_file_from_delete(*TEST_PATH1);
managed_directory.garbage_collect(|| living_files.clone());
assert!(managed_directory.exists(*TEST_PATH1));
}
managed_directory.garbage_collect(|| living_files.clone());
assert!(!managed_directory.exists(*TEST_PATH1));
}
}

View File

@@ -32,8 +32,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
}
})?;
let meta_data = file
.metadata()
let meta_data = file.metadata()
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
if meta_data.len() == 0 {
// if the file size is 0, it will not be possible
@@ -310,8 +309,7 @@ impl Directory for MmapDirectory {
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self
.sync_directory()
Ok(_) => self.sync_directory()
.map_err(|e| IOError::with_path(path.to_owned(), e).into()),
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {

View File

@@ -25,8 +25,7 @@ pub use self::read_only_source::ReadOnlySource;
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
pub(crate) use self::managed_directory::{FileProtection, ManagedDirectory};
pub(crate) use self::read_only_source::SourceRead;
pub(crate) use self::managed_directory::ManagedDirectory;
/// Synonym of Seek + Write
pub trait SeekableWrite: Seek + Write {}

View File

@@ -170,8 +170,7 @@ impl Directory for RAMDirectory {
let path_buf = PathBuf::from(path);
let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
let exists = self
.fs
let exists = self.fs
.write(path_buf.clone(), &Vec::new())
.map_err(|err| IOError::with_path(path.to_owned(), err))?;

View File

@@ -3,9 +3,8 @@ use common::HasLen;
#[cfg(feature = "mmap")]
use fst::raw::MmapReadOnly;
use stable_deref_trait::{CloneStableDeref, StableDeref};
use std::io::{self, Read};
use std::ops::Deref;
use std::slice;
/// Read object that represents files in tantivy.
///
@@ -120,49 +119,3 @@ impl From<Vec<u8>> for ReadOnlySource {
ReadOnlySource::Anonymous(shared_data)
}
}
/// Acts as a owning cursor over the data backed up by a `ReadOnlySource`
pub(crate) struct SourceRead {
_data_owner: ReadOnlySource,
cursor: &'static [u8],
}
impl SourceRead {
// Advance the cursor by a given number of bytes.
pub fn advance(&mut self, len: usize) {
self.cursor = &self.cursor[len..];
}
pub fn slice_from(&self, start: usize) -> &[u8] {
&self.cursor[start..]
}
pub fn get(&self, idx: usize) -> u8 {
self.cursor[idx]
}
}
impl AsRef<[u8]> for SourceRead {
fn as_ref(&self) -> &[u8] {
self.cursor
}
}
impl From<ReadOnlySource> for SourceRead {
// Creates a new `SourceRead` from a given `ReadOnlySource`
fn from(source: ReadOnlySource) -> SourceRead {
let len = source.len();
let slice_ptr = source.as_slice().as_ptr();
let static_slice = unsafe { slice::from_raw_parts(slice_ptr, len) };
SourceRead {
_data_owner: source,
cursor: static_slice,
}
}
}
impl Read for SourceRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.cursor.read(buf)
}
}

View File

@@ -41,8 +41,7 @@ pub struct DeleteBitSet {
impl DeleteBitSet {
/// Opens a delete bitset given its data source.
pub fn open(data: ReadOnlySource) -> DeleteBitSet {
let num_deleted: usize = data
.as_slice()
let num_deleted: usize = data.as_slice()
.iter()
.map(|b| b.count_ones() as usize)
.sum();

View File

@@ -56,8 +56,7 @@ impl FacetReader {
/// Given a term ordinal returns the term associated to it.
pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) {
let found_term = self
.term_dict
let found_term = self.term_dict
.ord_to_term(facet_ord as u64, output.inner_buffer_mut());
assert!(found_term, "Term ordinal {} no found.", facet_ord);
}

View File

@@ -52,8 +52,7 @@ impl DeleteQueue {
//
// Past delete operations are not accessible.
pub fn cursor(&self) -> DeleteCursor {
let last_block = self
.inner
let last_block = self.inner
.read()
.expect("Read lock poisoned when opening delete queue cursor")
.last_block
@@ -93,8 +92,7 @@ impl DeleteQueue {
// be some unflushed operations.
//
fn flush(&self) -> Option<Arc<Block>> {
let mut self_wlock = self
.inner
let mut self_wlock = self.inner
.write()
.expect("Failed to acquire write lock on delete queue writer");
@@ -134,8 +132,7 @@ impl From<DeleteQueue> for NextBlock {
impl NextBlock {
fn next_block(&self) -> Option<Arc<Block>> {
{
let next_read_lock = self
.0
let next_read_lock = self.0
.read()
.expect("Failed to acquire write lock in delete queue");
if let InnerNextBlock::Closed(ref block) = *next_read_lock {
@@ -144,8 +141,7 @@ impl NextBlock {
}
let next_block;
{
let mut next_write_lock = self
.0
let mut next_write_lock = self.0
.write()
.expect("Failed to acquire write lock in delete queue");
match *next_write_lock {

View File

@@ -9,7 +9,6 @@ use core::SegmentComponent;
use core::SegmentId;
use core::SegmentMeta;
use core::SegmentReader;
use directory::FileProtection;
use docset::DocSet;
use error::{Error, ErrorKind, Result, ResultExt};
use fastfield::write_delete_bitset;
@@ -216,15 +215,13 @@ pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: u64,
) -> Result<Option<FileProtection>> {
let mut file_protect: Option<FileProtection> = None;
) -> Result<()> {
{
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
// We are already up-to-date here.
if target_opstamp == previous_opstamp {
return Ok(file_protect);
}
return Ok(());
}
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
@@ -243,6 +240,7 @@ pub fn advance_deletes(
target_opstamp,
)?;
// TODO optimize
for doc in 0u32..max_doc {
if segment_reader.is_deleted(doc) {
delete_bitset.insert(doc as usize);
@@ -251,14 +249,13 @@ pub fn advance_deletes(
let num_deleted_docs = delete_bitset.len();
if num_deleted_docs > 0 {
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE));
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(file_protect)
segment_entry.set_meta((*segment.meta()).clone());
Ok(())
}
fn index_documents(
@@ -299,8 +296,7 @@ fn index_documents(
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let mut segment_meta = SegmentMeta::new(segment_id);
segment_meta.set_max_doc(num_docs);
let segment_meta = SegmentMeta::new(segment_id, num_docs);
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
@@ -342,8 +338,7 @@ impl IndexWriter {
}
drop(self.workers_join_handle);
let result = self
.segment_updater
let result = self.segment_updater
.wait_merging_thread()
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
@@ -448,7 +443,9 @@ impl IndexWriter {
}
/// Merges a given list of segments
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
///
/// `segment_ids` is required to be non-empty.
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
self.segment_updater.start_merge(segment_ids)
}
@@ -488,8 +485,7 @@ impl IndexWriter {
let document_receiver = self.document_receiver.clone();
// take the directory lock to create a new index_writer.
let directory_lock = self
._directory_lock
let directory_lock = self._directory_lock
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");

View File

@@ -116,15 +116,17 @@ mod tests {
assert!(result_list.is_empty());
}
fn seg_meta(num_docs: u32) -> SegmentMeta {
let mut segment_metas = SegmentMeta::new(SegmentId::generate_random());
segment_metas.set_max_doc(num_docs);
segment_metas
fn create_random_segment_meta(num_docs: u32) -> SegmentMeta {
SegmentMeta::new(SegmentId::generate_random(), num_docs)
}
#[test]
fn test_log_merge_policy_pair() {
let test_input = vec![seg_meta(10), seg_meta(10), seg_meta(10)];
let test_input = vec![
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 1);
}
@@ -137,17 +139,17 @@ mod tests {
// * one with the 3 * 1000-docs segments
// no MergeCandidate expected for the 2 * 10_000-docs segments as min_merge_size=3
let test_input = vec![
seg_meta(10),
seg_meta(10),
seg_meta(10),
seg_meta(1000),
seg_meta(1000),
seg_meta(1000),
seg_meta(10000),
seg_meta(10000),
seg_meta(10),
seg_meta(10),
seg_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(1000),
create_random_segment_meta(1000),
create_random_segment_meta(1000),
create_random_segment_meta(10000),
create_random_segment_meta(10000),
create_random_segment_meta(10),
create_random_segment_meta(10),
create_random_segment_meta(10),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 2);
@@ -157,12 +159,12 @@ mod tests {
fn test_log_merge_policy_within_levels() {
// multiple levels all get merged correctly
let test_input = vec![
seg_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
seg_meta(11), // log2(11) = ~3.46
seg_meta(12), // log2(12) = ~3.58
seg_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
seg_meta(1000), // log2(1000) = ~9.97
seg_meta(1000),
create_random_segment_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
create_random_segment_meta(11), // log2(11) = ~3.46
create_random_segment_meta(12), // log2(12) = ~3.58
create_random_segment_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
create_random_segment_meta(1000), // log2(1000) = ~9.97
create_random_segment_meta(1000),
]; // log2(1000) = ~9.97
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 2);
@@ -171,12 +173,12 @@ mod tests {
fn test_log_merge_policy_small_segments() {
// segments under min_layer_size are merged together
let test_input = vec![
seg_meta(1),
seg_meta(1),
seg_meta(1),
seg_meta(2),
seg_meta(2),
seg_meta(2),
create_random_segment_meta(1),
create_random_segment_meta(1),
create_random_segment_meta(1),
create_random_segment_meta(2),
create_random_segment_meta(2),
create_random_segment_meta(2),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 1);

View File

@@ -440,8 +440,7 @@ impl IndexMerger {
) -> Result<Option<TermOrdinalMapping>> {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let field_readers = self
.readers
let field_readers = self.readers
.iter()
.map(|reader| reader.inverted_index(indexed_field))
.collect::<Vec<_>>();
@@ -737,6 +736,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -980,6 +980,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1076,6 +1077,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1129,6 +1131,7 @@ mod tests {
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
@@ -1219,6 +1222,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -1290,6 +1294,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
@@ -1392,6 +1397,7 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();

View File

@@ -2,6 +2,8 @@ use super::segment_register::SegmentRegister;
use core::SegmentId;
use core::SegmentMeta;
use core::{LOCKFILE_FILEPATH, META_FILEPATH};
use error::ErrorKind;
use error::Result as TantivyResult;
use indexer::delete_queue::DeleteCursor;
use indexer::SegmentEntry;
use std::collections::hash_set::HashSet;
@@ -64,8 +66,9 @@ impl SegmentManager {
/// Returns all of the segment entries (committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let mut segment_entries = self.read().uncommitted.segment_entries();
segment_entries.extend(self.read().committed.segment_entries());
let registers_lock = self.read();
let mut segment_entries = registers_lock.uncommitted.segment_entries();
segment_entries.extend(registers_lock.committed.segment_entries());
segment_entries
}
@@ -76,32 +79,15 @@ impl SegmentManager {
}
pub fn list_files(&self) -> HashSet<PathBuf> {
let registers_lock = self.read();
let mut files = HashSet::new();
files.insert(META_FILEPATH.clone());
files.insert(LOCKFILE_FILEPATH.clone());
let segment_metas: Vec<SegmentMeta> = registers_lock
.committed
.get_all_segments()
.into_iter()
.chain(registers_lock.uncommitted.get_all_segments().into_iter())
.chain(registers_lock.writing.iter().cloned().map(SegmentMeta::new))
.collect();
for segment_meta in segment_metas {
for segment_meta in SegmentMeta::all() {
files.extend(segment_meta.list_files());
}
files
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
let registers = self.read();
registers
.committed
.segment_entry(segment_id)
.or_else(|| registers.uncommitted.segment_entry(segment_id))
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
// and the operations cannot panic.
@@ -126,19 +112,38 @@ impl SegmentManager {
}
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) {
/// Marks a list of segments as in merge.
///
/// Returns an error if some segments are missing, or if
/// the `segment_ids` are not either all committed or all
/// uncommitted.
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
let mut registers_lock = self.write();
let mut segment_entries = vec![];
if registers_lock.uncommitted.contains_all(segment_ids) {
for segment_id in segment_ids {
registers_lock.uncommitted.start_merge(segment_id);
let segment_entry = registers_lock.uncommitted
.start_merge(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
} else if registers_lock.committed.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.committed
.start_merge(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
for segment_id in segment_ids {
registers_lock.committed.start_merge(segment_id);
}
} else {
error!("Merge operation sent for segments that are not all uncommited or commited.");
let error_msg = "Merge operation sent for segments that are not \
all uncommited or commited."
.to_string();
bail!(ErrorKind::InvalidArgument(error_msg))
}
Ok(segment_entries)
}
pub fn cancel_merge(

View File

@@ -3,8 +3,7 @@ use core::SegmentMeta;
use indexer::delete_queue::DeleteCursor;
use indexer::segment_entry::SegmentEntry;
use std::collections::HashMap;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fmt::{self, Debug, Formatter};
/// The segment register keeps track
/// of the list of segment, their size as well
@@ -39,13 +38,6 @@ impl SegmentRegister {
self.segment_states.len()
}
pub fn get_all_segments(&self) -> Vec<SegmentMeta> {
self.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
pub fn get_mergeable_segments(&self) -> Vec<SegmentMeta> {
self.segment_states
.values()
@@ -59,8 +51,7 @@ impl SegmentRegister {
}
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self
.segment_states
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.collect();
@@ -68,10 +59,6 @@ impl SegmentRegister {
segment_ids
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
}
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
segment_ids
.iter()
@@ -94,11 +81,13 @@ impl SegmentRegister {
.cancel_merge();
}
pub fn start_merge(&mut self, segment_id: &SegmentId) {
self.segment_states
.get_mut(segment_id)
.expect("Received a merge notification for a segment that is not registered")
.start_merge();
pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option<SegmentEntry> {
if let Some(segment_entry) = self.segment_states.get_mut(segment_id) {
segment_entry.start_merge();
Some(segment_entry.clone())
} else {
None
}
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
@@ -110,6 +99,11 @@ impl SegmentRegister {
}
SegmentRegister { segment_states }
}
#[cfg(test)]
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
}
}
#[cfg(test)]
@@ -138,7 +132,7 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = SegmentMeta::new(segment_id_a);
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
@@ -151,7 +145,7 @@ mod tests {
);
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
@@ -181,7 +175,7 @@ mod tests {
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}

View File

@@ -7,11 +7,11 @@ use core::SegmentMeta;
use core::SerializableSegment;
use core::META_FILEPATH;
use directory::Directory;
use directory::FileProtection;
use error::{Error, ErrorKind, Result};
use error::{Error, ErrorKind, Result, ResultExt};
use futures::oneshot;
use futures::sync::oneshot::Receiver;
use futures::Future;
use futures_cpupool::Builder as CpuPoolBuilder;
use futures_cpupool::CpuFuture;
use futures_cpupool::CpuPool;
use indexer::delete_queue::DeleteCursor;
@@ -29,8 +29,7 @@ use std::collections::HashMap;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
@@ -87,38 +86,19 @@ pub fn save_metas(
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
fn perform_merge(
segment_ids: &[SegmentId],
segment_updater: &SegmentUpdater,
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
mut merged_segment: Segment,
target_opstamp: u64,
) -> Result<SegmentEntry> {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids);
let index = &segment_updater.0.index;
// TODO add logging
let schema = index.schema();
let mut segment_entries = vec![];
let mut file_protections: Vec<FileProtection> = vec![];
for segment_id in segment_ids {
if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id)
{
let segment = index.segment(segment_entry.meta().clone());
if let Some(file_protection) =
advance_deletes(segment, &mut segment_entry, target_opstamp)?
{
file_protections.push(file_protection);
}
segment_entries.push(segment_entry);
} else {
error!("Error, had to abort merge as some of the segment is not managed anymore.");
let msg = format!(
"Segment {:?} requested for merge is not managed.",
segment_id
);
bail!(ErrorKind::InvalidArgument(msg));
}
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
@@ -135,13 +115,13 @@ fn perform_merge(
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
.chain_err(|| "Creating index serializer failed")?;
let num_docs = merger
.write(segment_serializer)
.expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
.chain_err(|| "Serializing merged index failed")?;
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
Ok(after_merge_segment_entry)
@@ -167,8 +147,12 @@ impl SegmentUpdater {
) -> Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = CpuPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
.create();
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
pool,
index,
segment_manager,
merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())),
@@ -283,69 +267,85 @@ impl SegmentUpdater {
}).wait()
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
self.0.segment_manager.start_merge(segment_ids);
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
//let future_merged_segment = */
let segment_ids_vec = segment_ids.to_vec();
self.run_async(move |segment_updater| {
segment_updater.start_merge_impl(&segment_ids_vec[..])
}).wait()?
}
// `segment_ids` is required to be non-empty.
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
let segment_updater_clone = self.clone();
let segment_entries: Vec<SegmentEntry> = self.0.segment_manager.start_merge(segment_ids)?;
let segment_ids_vec = segment_ids.to_vec();
let merging_thread_id = self.get_merging_thread_id();
info!(
"Starting merge thread #{} - {:?}",
merging_thread_id, segment_ids
);
let (merging_future_send, merging_future_recv) = oneshot();
if segment_ids.is_empty() {
return merging_future_recv;
}
let target_opstamp = self.0.stamper.stamp();
let merging_join_handle = thread::spawn(move || {
// first we need to apply deletes to our segment.
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(
&segment_ids_vec,
&segment_updater_clone,
merged_segment,
target_opstamp,
);
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// first we need to apply deletes to our segment.
let merging_join_handle = thread::Builder::new()
.name(format!("mergingthread-{}", merging_thread_id))
.spawn(move || {
// first we need to apply deletes to our segment.
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(
&segment_updater_clone.0.index,
segment_entries,
merged_segment,
target_opstamp,
);
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}
}
segment_updater_clone
.0
.merging_threads
.write()
.unwrap()
.remove(&merging_thread_id);
Ok(())
});
segment_updater_clone
.0
.merging_threads
.write()
.unwrap()
.remove(&merging_thread_id);
Ok(())
})
.expect("Failed to spawn a thread.");
self.0
.merging_threads
.write()
.unwrap()
.insert(merging_thread_id, merging_join_handle);
merging_future_recv
Ok(merging_future_recv)
}
fn consider_merge_options(&self) {
@@ -358,8 +358,18 @@ impl SegmentUpdater {
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_metas) in merge_candidates {
if let Err(e) = self.start_merge(&segment_metas).fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
match self.start_merge_impl(&segment_metas) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
}
}
Err(err) => {
warn!(
"Starting the merge failed for the following reason. This is not fatal. {}",
err
);
}
}
}
}
@@ -382,7 +392,6 @@ impl SegmentUpdater {
self.run_async(move |segment_updater| {
info!("End merge {:?}", after_merge_segment_entry.meta());
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
let mut _file_protection_opt = None;
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater
.0
@@ -393,29 +402,22 @@ impl SegmentUpdater {
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
match advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
) {
Ok(file_protection_opt_res) => {
_file_protection_opt = file_protection_opt_res;
}
Err(e) => {
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids, e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater.cancel_merge(
&before_merge_segment_ids,
after_merge_segment_entry.segment_id(),
);
return;
if let Err(e) =
advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp)
{
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids, e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater.cancel_merge(
&before_merge_segment_ids,
after_merge_segment_entry.segment_id(),
);
return;
}
}
}

View File

@@ -180,6 +180,9 @@ mod macros;
pub use error::{Error, ErrorKind, ResultExt};
extern crate census;
extern crate owned_read;
/// Tantivy result.
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -94,8 +94,7 @@ impl MultiFieldPostingsWriter {
&self,
serializer: &mut InvertedIndexSerializer,
) -> Result<HashMap<Field, HashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self
.term_index
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index
.iter()
.map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId))
.collect();

View File

@@ -107,8 +107,7 @@ impl Recorder for TermFrequencyRecorder {
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
// the last document has not been closed...
// its term freq is self.current_tf.
let mut doc_iter = self
.stack
let mut doc_iter = self.stack
.iter(heap)
.chain(Some(self.current_tf).into_iter());

View File

@@ -2,15 +2,14 @@ use compression::{BlockDecoder, CompressedIntStream, VIntDecoder, COMPRESSION_BL
use DocId;
use common::BitSet;
use common::CountingWriter;
use common::HasLen;
use compression::compressed_block_size;
use directory::{ReadOnlySource, SourceRead};
use docset::{DocSet, SkipResult};
use fst::Streamer;
use postings::serializer::PostingsSerializer;
use postings::FreqReadingOption;
use postings::Postings;
use owned_read::OwnedRead;
struct PositionComputer {
// store the amount of position int
@@ -78,9 +77,9 @@ impl SegmentPostings {
/// and returns a `SegmentPostings` object that embeds a
/// buffer with the serialized data.
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
let mut counting_writer = CountingWriter::wrap(Vec::new());
let mut buffer = Vec::new();
{
let mut postings_serializer = PostingsSerializer::new(&mut counting_writer, false);
let mut postings_serializer = PostingsSerializer::new(&mut buffer, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32).unwrap();
}
@@ -88,13 +87,9 @@ impl SegmentPostings {
.close_term()
.expect("In memory Serialization should never fail.");
}
let (buffer, _) = counting_writer
.finish()
.expect("Serializing in a buffer should never fail.");
let data = ReadOnlySource::from(buffer);
let block_segment_postings = BlockSegmentPostings::from_data(
docs.len(),
SourceRead::from(data),
OwnedRead::new(buffer),
FreqReadingOption::NoFreq,
);
SegmentPostings::from_block_postings(block_segment_postings, None)
@@ -306,13 +301,13 @@ pub struct BlockSegmentPostings {
doc_offset: DocId,
num_bitpacked_blocks: usize,
num_vint_docs: usize,
remaining_data: SourceRead,
remaining_data: OwnedRead,
}
impl BlockSegmentPostings {
pub(crate) fn from_data(
doc_freq: usize,
data: SourceRead,
data: OwnedRead,
freq_reading_option: FreqReadingOption,
) -> BlockSegmentPostings {
let num_bitpacked_blocks: usize = (doc_freq as usize) / COMPRESSION_BLOCK_SIZE;
@@ -339,7 +334,7 @@ impl BlockSegmentPostings {
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: SourceRead) {
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: OwnedRead) {
let num_binpacked_blocks: usize = doc_freq / COMPRESSION_BLOCK_SIZE;
let num_vint_docs = doc_freq & (COMPRESSION_BLOCK_SIZE - 1);
self.num_bitpacked_blocks = num_binpacked_blocks;
@@ -399,8 +394,7 @@ impl BlockSegmentPostings {
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
if self.num_bitpacked_blocks > 0 {
let num_consumed_bytes = self
.doc_decoder
let num_consumed_bytes = self.doc_decoder
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
self.remaining_data.advance(num_consumed_bytes);
match self.freq_reading_option {
@@ -410,8 +404,7 @@ impl BlockSegmentPostings {
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
let num_consumed_bytes = self.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref());
self.remaining_data.advance(num_consumed_bytes);
}
@@ -451,7 +444,7 @@ impl BlockSegmentPostings {
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
remaining_data: From::from(ReadOnlySource::empty()),
remaining_data: OwnedRead::new(vec![]),
doc_offset: 0,
doc_freq: 0,
}

View File

@@ -160,8 +160,7 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let (filepos, offset) = self
.positions_serializer_opt
let (filepos, offset) = self.positions_serializer_opt
.as_ref()
.map(|positions_serializer| positions_serializer.addr())
.unwrap_or((0u64, 0u8));
@@ -240,13 +239,60 @@ impl<'a> FieldSerializer<'a> {
}
}
struct Block {
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
len: usize
}
impl Block {
fn new() -> Self {
Block {
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
len: 0
}
}
fn doc_ids(&self) -> &[DocId] {
&self.doc_ids[..self.len]
}
fn term_freqs(&self) -> &[u32] {
&self.term_freqs[..self.len]
}
fn clear(&mut self) {
self.len = 0;
}
fn append_doc(&mut self, doc: DocId, term_freq: u32) {
let len = self.len;
self.doc_ids[len] = doc;
self.term_freqs[len] = term_freq;
self.len = len + 1;
}
fn is_full(&self) -> bool {
self.len == COMPRESSION_BLOCK_SIZE
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn last_doc(&self) -> DocId {
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
}
}
pub struct PostingsSerializer<W: Write> {
postings_write: CountingWriter<W>,
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
doc_ids: Vec<DocId>,
term_freqs: Vec<u32>,
block: Box<Block>,
termfreq_enabled: bool,
}
@@ -257,42 +303,41 @@ impl<W: Write> PostingsSerializer<W> {
postings_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
doc_ids: vec![],
term_freqs: vec![],
block: Box::new(Block::new()),
last_doc_id_encoded: 0u32,
termfreq_enabled,
}
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
self.doc_ids.push(doc_id);
if self.termfreq_enabled {
self.term_freqs.push(term_freq as u32);
fn write_block(&mut self) -> io::Result<()> {
{
// encode the doc ids
let block_encoded: &[u8] = self.block_encoder
.compress_block_sorted(&self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.postings_write.write_all(block_encoded)?;
}
if self.doc_ids.len() == COMPRESSION_BLOCK_SIZE {
{
// encode the doc ids
let block_encoded: &[u8] = self
.block_encoder
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
self.postings_write.write_all(block_encoded)?;
}
if self.termfreq_enabled {
// encode the term_freqs
let block_encoded: &[u8] =
self.block_encoder.compress_block_unsorted(&self.term_freqs);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
self.doc_ids.clear();
if self.termfreq_enabled {
// encode the term_freqs
let block_encoded: &[u8] =
self.block_encoder.compress_block_unsorted(&self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
}
self.block.clear();
Ok(())
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block()?;
}
Ok(())
}
pub fn close_term(&mut self) -> io::Result<()> {
if !self.doc_ids.is_empty() {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
@@ -300,20 +345,17 @@ impl<W: Write> PostingsSerializer<W> {
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self
.block_encoder
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
let block_encoded = self.block_encoder
.compress_vint_sorted(&self.block.doc_ids(), self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
self.doc_ids.clear();
}
// ... Idem for term frequencies
if self.termfreq_enabled {
let block_encoded = self
.block_encoder
.compress_vint_unsorted(&self.term_freqs[..]);
let block_encoded = self.block_encoder
.compress_vint_unsorted(self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
self.block.clear();
}
Ok(())
}
@@ -327,8 +369,7 @@ impl<W: Write> PostingsSerializer<W> {
}
fn clear(&mut self) {
self.doc_ids.clear();
self.term_freqs.clear();
self.block.clear();
self.last_doc_id_encoded = 0;
}
}

View File

@@ -41,8 +41,7 @@ impl From<Vec<(Occur, Box<Query>)>> for BooleanQuery {
impl Query for BooleanQuery {
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<Weight>> {
let sub_weights = self
.subqueries
let sub_weights = self.subqueries
.iter()
.map(|&(ref occur, ref subquery)| {
Ok((*occur, subquery.weight(searcher, scoring_enabled)?))

View File

@@ -1,11 +1,21 @@
use query::Occur;
use schema::Field;
use schema::Term;
use schema::Type;
use std::fmt;
use std::ops::Bound;
#[derive(Clone)]
pub enum LogicalLiteral {
Term(Term),
Phrase(Vec<Term>),
Range {
field: Field,
value_type: Type,
lower: Bound<Term>,
upper: Bound<Term>,
},
All,
}
#[derive(Clone)]
@@ -54,6 +64,12 @@ impl fmt::Debug for LogicalLiteral {
match *self {
LogicalLiteral::Term(ref term) => write!(formatter, "{:?}", term),
LogicalLiteral::Phrase(ref terms) => write!(formatter, "\"{:?}\"", terms),
LogicalLiteral::Range {
ref lower,
ref upper,
..
} => write!(formatter, "({:?} TO {:?})", lower, upper),
LogicalLiteral::All => write!(formatter, "*"),
}
}
}

View File

@@ -1,29 +1,37 @@
use super::user_input_ast::*;
use combine::char::*;
use combine::*;
use query::query_parser::user_input_ast::UserInputBound;
fn field<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
(
letter(),
many(satisfy(|c: char| c.is_alphanumeric() || c == '_')),
).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
}
fn word<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
many1(satisfy(|c: char| c.is_alphanumeric()))
}
fn negative_number<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
(char('-'), many1(satisfy(|c: char| c.is_numeric())))
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
}
fn literal<I>(input: I) -> ParseResult<UserInputAST, I>
where
I: Stream<Item = char>,
{
let term_val = || {
let word = many1(satisfy(|c: char| c.is_alphanumeric()));
let phrase = (char('"'), many1(satisfy(|c| c != '"')), char('"')).map(|(_, s, _)| s);
phrase.or(word)
phrase.or(word())
};
let negative_numbers = (char('-'), many1(satisfy(|c: char| c.is_numeric())))
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2));
let field = (
letter(),
many(satisfy(|c: char| c.is_alphanumeric() || c == '_')),
).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2));
let term_val_with_field = negative_numbers.or(term_val());
let term_val_with_field = negative_number().or(term_val());
let term_query =
(field, char(':'), term_val_with_field).map(|(field_name, _, phrase)| UserInputLiteral {
(field(), char(':'), term_val_with_field).map(|(field_name, _, phrase)| UserInputLiteral {
field_name: Some(field_name),
phrase,
});
@@ -37,6 +45,36 @@ where
.parse_stream(input)
}
fn range<I: Stream<Item = char>>(input: I) -> ParseResult<UserInputAST, I> {
let term_val = || {
word().or(negative_number()).or(char('*').map(|_| "*".to_string()))
};
let lower_bound = {
let excl = (char('{'), term_val()).map(|(_, w)| UserInputBound::Exclusive(w));
let incl = (char('['), term_val()).map(|(_, w)| UserInputBound::Inclusive(w));
excl.or(incl)
};
let upper_bound = {
let excl = (term_val(), char('}')).map(|(w, _)| UserInputBound::Exclusive(w));
let incl = (term_val(), char(']')).map(|(w, _)| UserInputBound::Inclusive(w));
// TODO: this backtracking should be unnecessary
try(excl).or(incl)
};
(
optional((field(), char(':')).map(|x| x.0)),
lower_bound,
spaces(),
string("TO"),
spaces(),
upper_bound,
).map(|(field, lower, _, _, _, upper)| UserInputAST::Range {
field,
lower,
upper,
})
.parse_stream(input)
}
fn leaf<I>(input: I) -> ParseResult<UserInputAST, I>
where
I: Stream<Item = char>,
@@ -45,6 +83,8 @@ where
.map(|(_, expr)| UserInputAST::Not(Box::new(expr)))
.or((char('+'), parser(leaf)).map(|(_, expr)| UserInputAST::Must(Box::new(expr))))
.or((char('('), parser(parse_to_ast), char(')')).map(|(_, expr, _)| expr))
.or(char('*').map(|_| UserInputAST::All))
.or(try(parser(range)))
.or(parser(literal))
.parse_stream(input)
}
@@ -91,6 +131,12 @@ mod test {
test_parse_query_to_ast_helper("-abc:toto", "-(abc:\"toto\")");
test_parse_query_to_ast_helper("abc:a b", "(abc:\"a\" \"b\")");
test_parse_query_to_ast_helper("abc:\"a b\"", "abc:\"a b\"");
test_parse_query_to_ast_helper("foo:[1 TO 5]", "foo:[\"1\" TO \"5\"]");
test_parse_query_to_ast_helper("[1 TO 5]", "[\"1\" TO \"5\"]");
test_parse_query_to_ast_helper("foo:{a TO z}", "foo:{\"a\" TO \"z\"}");
test_parse_query_to_ast_helper("foo:[1 TO toto}", "foo:[\"1\" TO \"toto\"}");
test_parse_query_to_ast_helper("foo:[* TO toto}", "foo:[\"*\" TO \"toto\"}");
test_parse_query_to_ast_helper("foo:[1 TO *}", "foo:[\"1\" TO \"*\"}");
test_is_parse_err("abc + ");
}
}

View File

@@ -2,15 +2,19 @@ use super::logical_ast::*;
use super::query_grammar::parse_to_ast;
use super::user_input_ast::*;
use core::Index;
use query::AllQuery;
use query::BooleanQuery;
use query::Occur;
use query::PhraseQuery;
use query::Query;
use query::RangeQuery;
use query::TermQuery;
use schema::IndexRecordOption;
use schema::{Field, Schema};
use schema::{FieldType, Term};
use std::borrow::Cow;
use std::num::ParseIntError;
use std::ops::Bound;
use std::str::FromStr;
use tokenizer::TokenizerManager;
@@ -39,6 +43,9 @@ pub enum QueryParserError {
/// The tokenizer for the given field is unknown
/// The two argument strings are the name of the field, the name of the tokenizer
UnknownTokenizer(String, String),
/// The query contains a range query with a phrase as one of the bounds.
/// Only terms can be used as bounds.
RangeMustNotHavePhrase,
}
impl From<ParseIntError> for QueryParserError {
@@ -66,8 +73,8 @@ impl From<ParseIntError> for QueryParserError {
/// by relevance : The user typically just scans through the first few
/// documents in order of decreasing relevance and will stop when the documents
/// are not relevant anymore.
/// Making it possible to make this behavior customizable is tracked in
/// [issue #27](https://github.com/fulmicoton/tantivy/issues/27).
///
/// Switching to a default of `AND` can be done by calling `.set_conjunction_by_default()`.
///
/// * negative terms: By prepending a term by a `-`, a term can be excluded
/// from the search. This is useful for disambiguating a query.
@@ -75,6 +82,17 @@ impl From<ParseIntError> for QueryParserError {
///
/// * must terms: By prepending a term by a `+`, a term can be made required for the search.
///
/// * phrase terms: Quoted terms become phrase searches on fields that have positions indexed.
/// e.g., `title:"Barack Obama"` will only find documents that have "barack" immediately followed
/// by "obama".
///
/// * range terms: Range searches can be done by specifying the start and end bound. These can be
/// inclusive or exclusive. e.g., `title:[a TO c}` will find all documents whose title contains
/// a word lexicographically between `a` and `c` (inclusive lower bound, exclusive upper bound).
/// Inclusive bounds are `[]`, exclusive are `{}`.
///
/// * all docs query: A plain `*` will match all documents in the index.
///
pub struct QueryParser {
schema: Schema,
default_fields: Vec<Field>,
@@ -155,11 +173,12 @@ impl QueryParser {
}
Ok(ast)
}
fn compute_logical_ast_for_leaf(
fn compute_terms_for_string(
&self,
field: Field,
phrase: &str,
) -> Result<Option<LogicalLiteral>, QueryParserError> {
) -> Result<Vec<Term>, QueryParserError> {
let field_entry = self.schema.get_field_entry(field);
let field_type = field_entry.field_type();
if !field_type.is_indexed() {
@@ -170,12 +189,12 @@ impl QueryParser {
FieldType::I64(_) => {
let val: i64 = i64::from_str(phrase)?;
let term = Term::from_field_i64(field, val);
Ok(Some(LogicalLiteral::Term(term)))
Ok(vec![term])
}
FieldType::U64(_) => {
let val: u64 = u64::from_str(phrase)?;
let term = Term::from_field_u64(field, val);
Ok(Some(LogicalLiteral::Term(term)))
Ok(vec![term])
}
FieldType::Str(ref str_options) => {
if let Some(option) = str_options.get_indexing_options() {
@@ -194,17 +213,15 @@ impl QueryParser {
terms.push(term);
});
if terms.is_empty() {
Ok(None)
Ok(vec![])
} else if terms.len() == 1 {
Ok(Some(LogicalLiteral::Term(
terms.into_iter().next().unwrap(),
)))
Ok(terms)
} else {
let field_entry = self.schema.get_field_entry(field);
let field_type = field_entry.field_type();
if let Some(index_record_option) = field_type.get_index_record_option() {
if index_record_option.has_positions() {
Ok(Some(LogicalLiteral::Phrase(terms)))
Ok(terms)
} else {
let fieldname = self.schema.get_field_name(field).to_string();
Err(QueryParserError::FieldDoesNotHavePositionsIndexed(
@@ -223,10 +240,7 @@ impl QueryParser {
))
}
}
FieldType::HierarchicalFacet => {
let term = Term::from_field_text(field, phrase);
Ok(Some(LogicalLiteral::Term(term)))
}
FieldType::HierarchicalFacet => Ok(vec![Term::from_field_text(field, phrase)]),
FieldType::Bytes => {
let field_name = self.schema.get_field_name(field).to_string();
Err(QueryParserError::FieldNotIndexed(field_name))
@@ -234,6 +248,21 @@ impl QueryParser {
}
}
fn compute_logical_ast_for_leaf(
&self,
field: Field,
phrase: &str,
) -> Result<Option<LogicalLiteral>, QueryParserError> {
let terms = self.compute_terms_for_string(field, phrase)?;
match terms.len() {
0 => Ok(None),
1 => Ok(Some(LogicalLiteral::Term(
terms.into_iter().next().unwrap(),
))),
_ => Ok(Some(LogicalLiteral::Phrase(terms))),
}
}
fn default_occur(&self) -> Occur {
if self.conjunction_by_default {
Occur::Must
@@ -242,6 +271,37 @@ impl QueryParser {
}
}
fn resolve_bound(&self, field: Field, bound: &UserInputBound) -> Result<Bound<Term>, QueryParserError> {
if bound.term_str() == "*" {
return Ok(Bound::Unbounded);
}
let terms = self.compute_terms_for_string(field, bound.term_str())?;
if terms.len() != 1 {
return Err(QueryParserError::RangeMustNotHavePhrase);
}
let term = terms.into_iter().next().unwrap();
match *bound {
UserInputBound::Inclusive(_) => Ok(Bound::Included(term)),
UserInputBound::Exclusive(_) => Ok(Bound::Excluded(term)),
}
}
fn resolved_fields(
&self,
given_field: &Option<String>,
) -> Result<Cow<[Field]>, QueryParserError> {
match *given_field {
None => {
if self.default_fields.is_empty() {
Err(QueryParserError::NoDefaultFieldDeclared)
} else {
Ok(Cow::from(&self.default_fields[..]))
}
}
Some(ref field) => Ok(Cow::from(vec![self.resolve_field_name(&*field)?])),
}
}
fn compute_logical_ast_with_occur(
&self,
user_input_ast: UserInputAST,
@@ -265,6 +325,41 @@ impl QueryParser {
let (occur, logical_sub_queries) = self.compute_logical_ast_with_occur(*subquery)?;
Ok((compose_occur(Occur::Must, occur), logical_sub_queries))
}
UserInputAST::Range {
field,
lower,
upper,
} => {
let fields = self.resolved_fields(&field)?;
let mut clauses = fields
.iter()
.map(|&field| {
let field_entry = self.schema.get_field_entry(field);
let value_type = field_entry.field_type().value_type();
Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range {
field,
value_type,
lower: self.resolve_bound(field, &lower)?,
upper: self.resolve_bound(field, &upper)?,
})))
})
.collect::<Result<Vec<_>, QueryParserError>>()?;
let result_ast = if clauses.len() == 1 {
clauses.pop().unwrap()
} else {
LogicalAST::Clause(
clauses
.into_iter()
.map(|clause| (Occur::Should, clause))
.collect(),
)
};
Ok((Occur::Should, result_ast))
}
UserInputAST::All => Ok((
Occur::Should,
LogicalAST::Leaf(Box::new(LogicalLiteral::All)),
)),
UserInputAST::Leaf(literal) => {
let term_phrases: Vec<(Field, String)> = match literal.field_name {
Some(ref field_name) => {
@@ -327,6 +422,13 @@ fn convert_literal_to_query(logical_literal: LogicalLiteral) -> Box<Query> {
match logical_literal {
LogicalLiteral::Term(term) => Box::new(TermQuery::new(term, IndexRecordOption::WithFreqs)),
LogicalLiteral::Phrase(terms) => Box::new(PhraseQuery::new(terms)),
LogicalLiteral::Range {
field,
value_type,
lower,
upper,
} => Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper)),
LogicalLiteral::All => Box::new(AllQuery),
}
}
@@ -511,6 +613,42 @@ mod test {
Term([0, 0, 0, 0, 98])]\"",
false,
);
test_parse_query_to_logical_ast_helper(
"title:[a TO b]",
"(Included(Term([0, 0, 0, 0, 97])) TO \
Included(Term([0, 0, 0, 0, 98])))",
false,
);
test_parse_query_to_logical_ast_helper(
"[a TO b]",
"((Included(Term([0, 0, 0, 0, 97])) TO \
Included(Term([0, 0, 0, 0, 98]))) \
(Included(Term([0, 0, 0, 1, 97])) TO \
Included(Term([0, 0, 0, 1, 98]))))",
false,
);
test_parse_query_to_logical_ast_helper(
"title:{titi TO toto}",
"(Excluded(Term([0, 0, 0, 0, 116, 105, 116, 105])) TO \
Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))",
false,
);
test_parse_query_to_logical_ast_helper(
"title:{* TO toto}",
"(Unbounded TO \
Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))",
false,
);
test_parse_query_to_logical_ast_helper(
"title:{titi TO *}",
"(Excluded(Term([0, 0, 0, 0, 116, 105, 116, 105])) TO Unbounded)",
false,
);
test_parse_query_to_logical_ast_helper(
"*",
"*",
false,
);
}
#[test]

View File

@@ -14,10 +14,44 @@ impl fmt::Debug for UserInputLiteral {
}
}
pub enum UserInputBound {
Inclusive(String),
Exclusive(String),
}
impl UserInputBound {
fn display_lower(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
UserInputBound::Inclusive(ref word) => write!(formatter, "[\"{}\"", word),
UserInputBound::Exclusive(ref word) => write!(formatter, "{{\"{}\"", word),
}
}
fn display_upper(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
UserInputBound::Inclusive(ref word) => write!(formatter, "\"{}\"]", word),
UserInputBound::Exclusive(ref word) => write!(formatter, "\"{}\"}}", word),
}
}
pub fn term_str(&self) -> &str {
match *self {
UserInputBound::Inclusive(ref contents) => contents,
UserInputBound::Exclusive(ref contents) => contents,
}
}
}
pub enum UserInputAST {
Clause(Vec<Box<UserInputAST>>),
Not(Box<UserInputAST>),
Must(Box<UserInputAST>),
Range {
field: Option<String>,
lower: UserInputBound,
upper: UserInputBound,
},
All,
Leaf(Box<UserInputLiteral>),
}
@@ -45,6 +79,20 @@ impl fmt::Debug for UserInputAST {
Ok(())
}
UserInputAST::Not(ref subquery) => write!(formatter, "-({:?})", subquery),
UserInputAST::Range {
ref field,
ref lower,
ref upper,
} => {
if let &Some(ref field) = field {
write!(formatter, "{}:", field)?;
}
lower.display_lower(formatter)?;
write!(formatter, " TO ")?;
upper.display_upper(formatter)?;
Ok(())
}
UserInputAST::All => write!(formatter, "*"),
UserInputAST::Leaf(ref subquery) => write!(formatter, "{:?}", subquery),
}
}

View File

@@ -89,6 +89,28 @@ pub struct RangeQuery {
}
impl RangeQuery {
/// Creates a new `RangeQuery` from bounded start and end terms.
///
/// If the value type is not correct, something may go terribly wrong when
/// the `Weight` object is created.
pub fn new_term_bounds(
field: Field,
value_type: Type,
left_bound: Bound<Term>,
right_bound: Bound<Term>,
) -> RangeQuery {
let verify_and_unwrap_term = |val: &Term| {
assert_eq!(field, val.field());
val.value_bytes().to_owned()
};
RangeQuery {
field,
value_type,
left_bound: map_bound(&left_bound, &verify_and_unwrap_term),
right_bound: map_bound(&right_bound, &verify_and_unwrap_term),
}
}
/// Creates a new `RangeQuery` over a `i64` field.
///
/// If the field is not of the type `i64`, tantivy

View File

@@ -72,8 +72,7 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
loop {
skip_pointer = match skip_pointer {
Some((skip_doc_id, skip_offset)) => self
.get_skip_layer(layer_id)
Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?,
None => {
return Ok(());

View File

@@ -164,8 +164,7 @@ impl TermDictionary {
let fst = self.fst_index.as_fst();
let mut node = fst.root();
while ord != 0 || !node.is_final() {
if let Some(transition) = node
.transitions()
if let Some(transition) = node.transitions()
.take_while(|transition| transition.out.value() <= ord)
.last()
{