mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 15:52:55 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31655e92d7 | ||
|
|
6b8d76685a | ||
|
|
ce5683fc6a | ||
|
|
5205579db6 | ||
|
|
d056ae60dc | ||
|
|
af9280c95f | ||
|
|
2e538ce6e6 | ||
|
|
00466d2b08 | ||
|
|
8ebbf6b336 | ||
|
|
1ce36bb211 | ||
|
|
2ac43bf21b | ||
|
|
3fd8c2aa5a |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -1,7 +1,18 @@
|
||||
Tantivy 0.6.1
|
||||
=========================
|
||||
- Bugfix #324. GC removing was removing file that were still in useful
|
||||
- Added support for parsing AllQuery and RangeQuery via QueryParser
|
||||
- AllQuery: `*`
|
||||
- RangeQuery:
|
||||
- Inclusive `field:[startIncl to endIncl]`
|
||||
- Exclusive `field:{startExcl to endExcl}`
|
||||
- Mixed `field:[startIncl to endExcl}` and vice versa
|
||||
- Unbounded `field:[start to *]`, `field:[* to end]`
|
||||
|
||||
|
||||
Tantivy 0.6
|
||||
==========================
|
||||
|
||||
|
||||
Special thanks to @drusellers and @jason-wolfe for their contributions
|
||||
to this release!
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -9,7 +9,7 @@ documentation = "https://tantivy-search.github.io/tantivy/tantivy/index.html"
|
||||
homepage = "https://github.com/tantivy-search/tantivy"
|
||||
repository = "https://github.com/tantivy-search/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "search engine", "information", "retrieval"]
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.9.1"
|
||||
@@ -45,7 +45,9 @@ rust-stemmers = "0.1.0"
|
||||
downcast = { version="0.9" }
|
||||
matches = "0.1"
|
||||
bitpacking = "0.5"
|
||||
census = "0.1"
|
||||
fnv = "1.0.6"
|
||||
owned-read = "0.1"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.2"
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
os: Visual Studio 2015
|
||||
environment:
|
||||
matrix:
|
||||
- channel: nightly
|
||||
- channel: stable
|
||||
target: x86_64-pc-windows-msvc
|
||||
|
||||
install:
|
||||
|
||||
@@ -342,19 +342,16 @@ impl FacetCollector {
|
||||
pub fn harvest(mut self) -> FacetCounts {
|
||||
self.finalize_segment();
|
||||
|
||||
let collapsed_facet_ords: Vec<&[u64]> = self
|
||||
.segment_counters
|
||||
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
|
||||
.iter()
|
||||
.map(|segment_counter| &segment_counter.facet_ords[..])
|
||||
.collect();
|
||||
let collapsed_facet_counts: Vec<&[u64]> = self
|
||||
.segment_counters
|
||||
let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters
|
||||
.iter()
|
||||
.map(|segment_counter| &segment_counter.facet_counts[..])
|
||||
.collect();
|
||||
|
||||
let facet_streams = self
|
||||
.segment_counters
|
||||
let facet_streams = self.segment_counters
|
||||
.iter()
|
||||
.map(|seg_counts| seg_counts.facet_reader.facet_dict().range().into_stream())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -405,8 +402,7 @@ impl Collector for FacetCollector {
|
||||
|
||||
fn collect(&mut self, doc: DocId, _: Score) {
|
||||
let facet_reader: &mut FacetReader = unsafe {
|
||||
&mut *self
|
||||
.ff_reader
|
||||
&mut *self.ff_reader
|
||||
.as_ref()
|
||||
.expect("collect() was called before set_segment. This should never happen.")
|
||||
.get()
|
||||
|
||||
@@ -161,13 +161,11 @@ impl Collector for TopCollector {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
if self.at_capacity() {
|
||||
// It's ok to unwrap as long as a limit of 0 is forbidden.
|
||||
let limit_doc: GlobalScoredDoc = *self
|
||||
.heap
|
||||
let limit_doc: GlobalScoredDoc = *self.heap
|
||||
.peek()
|
||||
.expect("Top collector with size 0 is forbidden");
|
||||
if limit_doc.score < score {
|
||||
let mut mut_head = self
|
||||
.heap
|
||||
let mut mut_head = self.heap
|
||||
.peek_mut()
|
||||
.expect("Top collector with size 0 is forbidden");
|
||||
mut_head.score = score;
|
||||
|
||||
@@ -72,8 +72,7 @@ impl<W: Write> CompositeWrite<W> {
|
||||
let footer_offset = self.write.written_bytes();
|
||||
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
|
||||
|
||||
let mut offset_fields: Vec<_> = self
|
||||
.offsets
|
||||
let mut offset_fields: Vec<_> = self.offsets
|
||||
.iter()
|
||||
.map(|(file_addr, offset)| (*offset, *file_addr))
|
||||
.collect();
|
||||
|
||||
@@ -34,8 +34,7 @@ impl BlockEncoder {
|
||||
let num_bits = self.bitpacker.num_bits_sorted(offset, block);
|
||||
self.output[0] = num_bits;
|
||||
let written_size =
|
||||
1 + self
|
||||
.bitpacker
|
||||
1 + self.bitpacker
|
||||
.compress_sorted(offset, block, &mut self.output[1..], num_bits);
|
||||
&self.output[..written_size]
|
||||
}
|
||||
@@ -43,8 +42,7 @@ impl BlockEncoder {
|
||||
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] {
|
||||
let num_bits = self.bitpacker.num_bits(block);
|
||||
self.output[0] = num_bits;
|
||||
let written_size = 1 + self
|
||||
.bitpacker
|
||||
let written_size = 1 + self.bitpacker
|
||||
.compress(block, &mut self.output[1..], num_bits);
|
||||
&self.output[..written_size]
|
||||
}
|
||||
@@ -85,8 +83,7 @@ impl BlockDecoder {
|
||||
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
|
||||
let num_bits = compressed_data[0];
|
||||
self.output_len = COMPRESSION_BLOCK_SIZE;
|
||||
1 + self
|
||||
.bitpacker
|
||||
1 + self.bitpacker
|
||||
.decompress(&compressed_data[1..], &mut self.output, num_bits)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use compression::compressed_block_size;
|
||||
use compression::BlockDecoder;
|
||||
use compression::COMPRESSION_BLOCK_SIZE;
|
||||
use directory::{ReadOnlySource, SourceRead};
|
||||
use directory::ReadOnlySource;
|
||||
use owned_read::OwnedRead;
|
||||
|
||||
/// Reads a stream of compressed ints.
|
||||
///
|
||||
@@ -10,7 +11,7 @@ use directory::{ReadOnlySource, SourceRead};
|
||||
/// The `.skip(...)` makes it possible to avoid
|
||||
/// decompressing blocks that are not required.
|
||||
pub struct CompressedIntStream {
|
||||
buffer: SourceRead,
|
||||
buffer: OwnedRead,
|
||||
|
||||
block_decoder: BlockDecoder,
|
||||
cached_addr: usize, // address of the currently decoded block
|
||||
@@ -24,7 +25,7 @@ impl CompressedIntStream {
|
||||
/// Opens a compressed int stream.
|
||||
pub(crate) fn wrap(source: ReadOnlySource) -> CompressedIntStream {
|
||||
CompressedIntStream {
|
||||
buffer: SourceRead::from(source),
|
||||
buffer: OwnedRead::new(source),
|
||||
block_decoder: BlockDecoder::new(),
|
||||
cached_addr: usize::max_value(),
|
||||
cached_next_addr: usize::max_value(),
|
||||
@@ -42,8 +43,7 @@ impl CompressedIntStream {
|
||||
// no need to read.
|
||||
self.cached_next_addr
|
||||
} else {
|
||||
let next_addr = addr + self
|
||||
.block_decoder
|
||||
let next_addr = addr + self.block_decoder
|
||||
.uncompress_block_unsorted(self.buffer.slice_from(addr));
|
||||
self.cached_addr = addr;
|
||||
self.cached_next_addr = next_addr;
|
||||
|
||||
@@ -191,8 +191,7 @@ impl Index {
|
||||
|
||||
/// Returns the list of segments that are searchable
|
||||
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
|
||||
Ok(self
|
||||
.searchable_segment_metas()?
|
||||
Ok(self.searchable_segment_metas()?
|
||||
.into_iter()
|
||||
.map(|segment_meta| self.segment(segment_meta))
|
||||
.collect())
|
||||
@@ -205,8 +204,8 @@ impl Index {
|
||||
|
||||
/// Creates a new segment.
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
let segment_meta = SegmentMeta::new(SegmentId::generate_random());
|
||||
create_segment(self.clone(), segment_meta)
|
||||
let segment_meta = SegmentMeta::new(SegmentId::generate_random(), 0);
|
||||
self.segment(segment_meta)
|
||||
}
|
||||
|
||||
/// Return a reference to the index directory.
|
||||
@@ -227,8 +226,7 @@ impl Index {
|
||||
|
||||
/// Returns the list of segment ids that are searchable.
|
||||
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
|
||||
Ok(self
|
||||
.searchable_segment_metas()?
|
||||
Ok(self.searchable_segment_metas()?
|
||||
.iter()
|
||||
.map(|segment_meta| segment_meta.id())
|
||||
.collect())
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use common::BinarySerializable;
|
||||
use compression::CompressedIntStream;
|
||||
use directory::{ReadOnlySource, SourceRead};
|
||||
use directory::ReadOnlySource;
|
||||
use postings::FreqReadingOption;
|
||||
use postings::TermInfo;
|
||||
use postings::{BlockSegmentPostings, SegmentPostings};
|
||||
@@ -8,6 +8,7 @@ use schema::FieldType;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::Term;
|
||||
use termdict::TermDictionary;
|
||||
use owned_read::OwnedRead;
|
||||
|
||||
/// The inverted index reader is in charge of accessing
|
||||
/// the inverted index associated to a specific field.
|
||||
@@ -92,7 +93,7 @@ impl InvertedIndexReader {
|
||||
let offset = term_info.postings_offset as usize;
|
||||
let end_source = self.postings_source.len();
|
||||
let postings_slice = self.postings_source.slice(offset, end_source);
|
||||
let postings_reader = SourceRead::from(postings_slice);
|
||||
let postings_reader = OwnedRead::new(postings_slice);
|
||||
block_postings.reset(term_info.doc_freq as usize, postings_reader);
|
||||
}
|
||||
|
||||
@@ -114,7 +115,7 @@ impl InvertedIndexReader {
|
||||
};
|
||||
BlockSegmentPostings::from_data(
|
||||
term_info.doc_freq as usize,
|
||||
SourceRead::from(postings_data),
|
||||
OwnedRead::new(postings_data),
|
||||
freq_reading_option,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -87,8 +87,7 @@ impl<T> Deref for LeasedItem<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
&self
|
||||
.gen_item
|
||||
&self.gen_item
|
||||
.as_ref()
|
||||
.expect("Unwrapping a leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
@@ -97,8 +96,7 @@ impl<T> Deref for LeasedItem<T> {
|
||||
|
||||
impl<T> DerefMut for LeasedItem<T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self
|
||||
.gen_item
|
||||
&mut self.gen_item
|
||||
.as_mut()
|
||||
.expect("Unwrapping a mut leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
|
||||
@@ -78,8 +78,7 @@ impl Searcher {
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
pub fn field(&self, field: Field) -> FieldSearcher {
|
||||
let inv_index_readers = self
|
||||
.segment_readers
|
||||
let inv_index_readers = self.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.inverted_index(field))
|
||||
.collect::<Vec<_>>();
|
||||
@@ -99,8 +98,7 @@ impl FieldSearcher {
|
||||
/// Returns a Stream over all of the sorted unique terms of
|
||||
/// for the given field.
|
||||
pub fn terms(&self) -> TermMerger {
|
||||
let term_streamers: Vec<_> = self
|
||||
.inv_index_readers
|
||||
let term_streamers: Vec<_> = self.inv_index_readers
|
||||
.iter()
|
||||
.map(|inverted_index| inverted_index.terms().stream())
|
||||
.collect();
|
||||
@@ -110,8 +108,7 @@ impl FieldSearcher {
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let segment_ids = self
|
||||
.segment_readers
|
||||
let segment_ids = self.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.segment_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -4,7 +4,7 @@ use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use directory::error::{OpenReadError, OpenWriteError};
|
||||
use directory::Directory;
|
||||
use directory::{FileProtection, ReadOnlySource, WritePtr};
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use indexer::segment_serializer::SegmentSerializer;
|
||||
use schema::Schema;
|
||||
use std::fmt;
|
||||
@@ -28,6 +28,7 @@ impl fmt::Debug for Segment {
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
/// #[doc(hidden)]
|
||||
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment { index, meta }
|
||||
}
|
||||
@@ -49,8 +50,11 @@ impl Segment {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
|
||||
self.meta.set_delete_meta(num_deleted_docs, opstamp);
|
||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
|
||||
Segment {
|
||||
index: self.index,
|
||||
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the segment's id.
|
||||
@@ -66,16 +70,6 @@ impl Segment {
|
||||
self.meta.relative_path(component)
|
||||
}
|
||||
|
||||
/// Protects a specific component file from being deleted.
|
||||
///
|
||||
/// Returns a FileProtection object. The file is guaranteed
|
||||
/// to not be garbage collected as long as this `FileProtection` object
|
||||
/// lives.
|
||||
pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection {
|
||||
let path = self.relative_path(component);
|
||||
self.index.directory().protect_file_from_delete(&path)
|
||||
}
|
||||
|
||||
/// Open one of the component file for a *regular* read.
|
||||
pub fn open_read(
|
||||
&self,
|
||||
@@ -105,35 +99,3 @@ pub trait SerializableSegment {
|
||||
/// The number of documents in the segment.
|
||||
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use core::SegmentComponent;
|
||||
use directory::Directory;
|
||||
use schema::SchemaBuilder;
|
||||
use std::collections::HashSet;
|
||||
use Index;
|
||||
|
||||
#[test]
|
||||
fn test_segment_protect_component() {
|
||||
let mut index = Index::create_in_ram(SchemaBuilder::new().build());
|
||||
let segment = index.new_segment();
|
||||
let path = segment.relative_path(SegmentComponent::POSTINGS);
|
||||
|
||||
let directory = index.directory_mut();
|
||||
directory.atomic_write(&*path, &vec![0u8]).unwrap();
|
||||
|
||||
let living_files = HashSet::new();
|
||||
{
|
||||
let _file_protection = segment.protect_from_delete(SegmentComponent::POSTINGS);
|
||||
assert!(directory.exists(&*path));
|
||||
directory.garbage_collect(|| living_files.clone());
|
||||
assert!(directory.exists(&*path));
|
||||
}
|
||||
|
||||
directory.garbage_collect(|| living_files);
|
||||
assert!(!directory.exists(&*path));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
use super::SegmentComponent;
|
||||
use census::{Inventory, TrackedObject};
|
||||
use core::SegmentId;
|
||||
use serde;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
lazy_static! {
|
||||
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct DeleteMeta {
|
||||
num_deleted_docs: u32,
|
||||
@@ -13,32 +20,72 @@ struct DeleteMeta {
|
||||
///
|
||||
/// For instance the number of docs it contains,
|
||||
/// how many are deleted, etc.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentMeta {
|
||||
segment_id: SegmentId,
|
||||
max_doc: u32,
|
||||
deletes: Option<DeleteMeta>,
|
||||
tracked: TrackedObject<InnerSegmentMeta>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for SegmentMeta {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
self.tracked.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for SegmentMeta {
|
||||
fn serialize<S>(
|
||||
&self,
|
||||
serializer: S,
|
||||
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
self.tracked.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> serde::Deserialize<'a> for SegmentMeta {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, <D as serde::Deserializer<'a>>::Error>
|
||||
where
|
||||
D: serde::Deserializer<'a>,
|
||||
{
|
||||
let inner = InnerSegmentMeta::deserialize(deserializer)?;
|
||||
let tracked = INVENTORY.track(inner);
|
||||
Ok(SegmentMeta { tracked: tracked })
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentMeta {
|
||||
/// Creates a new segment meta for
|
||||
/// a segment with no deletes and no documents.
|
||||
pub fn new(segment_id: SegmentId) -> SegmentMeta {
|
||||
SegmentMeta {
|
||||
/// Lists all living `SegmentMeta` object at the time of the call.
|
||||
pub fn all() -> Vec<SegmentMeta> {
|
||||
INVENTORY
|
||||
.list()
|
||||
.into_iter()
|
||||
.map(|inner| SegmentMeta { tracked: inner })
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Creates a new `SegmentMeta` object.
|
||||
#[doc(hidden)]
|
||||
pub fn new(segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
|
||||
let inner = InnerSegmentMeta {
|
||||
segment_id,
|
||||
max_doc: 0,
|
||||
max_doc,
|
||||
deletes: None,
|
||||
};
|
||||
SegmentMeta {
|
||||
tracked: INVENTORY.track(inner),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the segment id.
|
||||
pub fn id(&self) -> SegmentId {
|
||||
self.segment_id
|
||||
self.tracked.segment_id
|
||||
}
|
||||
|
||||
/// Returns the number of deleted documents.
|
||||
pub fn num_deleted_docs(&self) -> u32 {
|
||||
self.deletes
|
||||
self.tracked
|
||||
.deletes
|
||||
.as_ref()
|
||||
.map(|delete_meta| delete_meta.num_deleted_docs)
|
||||
.unwrap_or(0u32)
|
||||
@@ -80,7 +127,7 @@ impl SegmentMeta {
|
||||
/// and all the doc ids contains in this segment
|
||||
/// are exactly (0..max_doc).
|
||||
pub fn max_doc(&self) -> u32 {
|
||||
self.max_doc
|
||||
self.tracked.max_doc
|
||||
}
|
||||
|
||||
/// Return the number of documents in the segment.
|
||||
@@ -91,25 +138,36 @@ impl SegmentMeta {
|
||||
/// Returns the opstamp of the last delete operation
|
||||
/// taken in account in this segment.
|
||||
pub fn delete_opstamp(&self) -> Option<u64> {
|
||||
self.deletes.as_ref().map(|delete_meta| delete_meta.opstamp)
|
||||
self.tracked
|
||||
.deletes
|
||||
.as_ref()
|
||||
.map(|delete_meta| delete_meta.opstamp)
|
||||
}
|
||||
|
||||
/// Returns true iff the segment meta contains
|
||||
/// delete information.
|
||||
pub fn has_deletes(&self) -> bool {
|
||||
self.deletes.is_some()
|
||||
self.num_deleted_docs() > 0
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn set_max_doc(&mut self, max_doc: u32) {
|
||||
self.max_doc = max_doc;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) {
|
||||
self.deletes = Some(DeleteMeta {
|
||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
|
||||
let delete_meta = DeleteMeta {
|
||||
num_deleted_docs,
|
||||
opstamp,
|
||||
};
|
||||
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
|
||||
segment_id: inner_meta.segment_id,
|
||||
max_doc: inner_meta.max_doc,
|
||||
deletes: Some(delete_meta),
|
||||
});
|
||||
SegmentMeta { tracked }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct InnerSegmentMeta {
|
||||
segment_id: SegmentId,
|
||||
max_doc: u32,
|
||||
deletes: Option<DeleteMeta>,
|
||||
}
|
||||
|
||||
@@ -156,13 +156,11 @@ impl SegmentReader {
|
||||
&FieldType::Bytes => {}
|
||||
_ => return Err(FastFieldNotAvailableError::new(field_entry)),
|
||||
}
|
||||
let idx_reader = self
|
||||
.fast_fields_composite
|
||||
let idx_reader = self.fast_fields_composite
|
||||
.open_read_with_idx(field, 0)
|
||||
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
|
||||
.map(FastFieldReader::open)?;
|
||||
let values = self
|
||||
.fast_fields_composite
|
||||
let values = self.fast_fields_composite
|
||||
.open_read_with_idx(field, 1)
|
||||
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
|
||||
Ok(BytesFastFieldReader::open(idx_reader, values))
|
||||
@@ -274,8 +272,7 @@ impl SegmentReader {
|
||||
/// term dictionary associated to a specific field,
|
||||
/// and opening the posting list associated to any term.
|
||||
pub fn inverted_index(&self, field: Field) -> Arc<InvertedIndexReader> {
|
||||
if let Some(inv_idx_reader) = self
|
||||
.inv_idx_reader_cache
|
||||
if let Some(inv_idx_reader) = self.inv_idx_reader_cache
|
||||
.read()
|
||||
.expect("Lock poisoned. This should never happen")
|
||||
.get(&field)
|
||||
@@ -304,13 +301,11 @@ impl SegmentReader {
|
||||
|
||||
let postings_source = postings_source_opt.unwrap();
|
||||
|
||||
let termdict_source = self
|
||||
.termdict_composite
|
||||
let termdict_source = self.termdict_composite
|
||||
.open_read(field)
|
||||
.expect("Failed to open field term dictionary in composite file. Is the field indexed");
|
||||
|
||||
let positions_source = self
|
||||
.positions_composite
|
||||
let positions_source = self.positions_composite
|
||||
.open_read(field)
|
||||
.expect("Index corrupted. Failed to open field positions in composite file.");
|
||||
|
||||
|
||||
@@ -173,9 +173,6 @@ pub enum DeleteError {
|
||||
/// Any kind of IO error that happens when
|
||||
/// interacting with the underlying IO device.
|
||||
IOError(IOError),
|
||||
/// The file may not be deleted because it is
|
||||
/// protected.
|
||||
FileProtected(PathBuf),
|
||||
}
|
||||
|
||||
impl From<IOError> for DeleteError {
|
||||
@@ -190,9 +187,6 @@ impl fmt::Display for DeleteError {
|
||||
DeleteError::FileDoesNotExist(ref path) => {
|
||||
write!(f, "the file '{:?}' does not exist", path)
|
||||
}
|
||||
DeleteError::FileProtected(ref path) => {
|
||||
write!(f, "the file '{:?}' is protected and can't be deleted", path)
|
||||
}
|
||||
DeleteError::IOError(ref err) => {
|
||||
write!(f, "an io error occurred while deleting a file: '{}'", err)
|
||||
}
|
||||
@@ -207,7 +201,7 @@ impl StdError for DeleteError {
|
||||
|
||||
fn cause(&self) -> Option<&StdError> {
|
||||
match *self {
|
||||
DeleteError::FileDoesNotExist(_) | DeleteError::FileProtected(_) => None,
|
||||
DeleteError::FileDoesNotExist(_) => None,
|
||||
DeleteError::IOError(ref err) => Some(err),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,7 @@ use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use error::{ErrorKind, Result, ResultExt};
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -32,37 +30,6 @@ pub struct ManagedDirectory {
|
||||
#[derive(Debug, Default)]
|
||||
struct MetaInformation {
|
||||
managed_paths: HashSet<PathBuf>,
|
||||
protected_files: HashMap<PathBuf, usize>,
|
||||
}
|
||||
|
||||
/// A `FileProtection` prevents the garbage collection of a file.
|
||||
///
|
||||
/// See `ManagedDirectory.protect_file_from_delete`.
|
||||
pub struct FileProtection {
|
||||
directory: ManagedDirectory,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
fn unprotect_file_from_delete(directory: &ManagedDirectory, path: &Path) {
|
||||
let mut meta_informations_wlock = directory
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned");
|
||||
if let Some(counter_ref_mut) = meta_informations_wlock.protected_files.get_mut(path) {
|
||||
(*counter_ref_mut) -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for FileProtection {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
|
||||
write!(formatter, "FileProtectionFor({:?})", self.path)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FileProtection {
|
||||
fn drop(&mut self) {
|
||||
unprotect_file_from_delete(&self.directory, &*self.path);
|
||||
}
|
||||
}
|
||||
|
||||
/// Saves the file containing the list of existing files
|
||||
@@ -89,7 +56,6 @@ impl ManagedDirectory {
|
||||
directory: Box::new(directory),
|
||||
meta_informations: Arc::new(RwLock::new(MetaInformation {
|
||||
managed_paths: managed_files,
|
||||
protected_files: HashMap::default(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
@@ -117,8 +83,7 @@ impl ManagedDirectory {
|
||||
let mut files_to_delete = vec![];
|
||||
{
|
||||
// releasing the lock as .delete() will use it too.
|
||||
let meta_informations_rlock = self
|
||||
.meta_informations
|
||||
let meta_informations_rlock = self.meta_informations
|
||||
.read()
|
||||
.expect("Managed directory rlock poisoned in garbage collect.");
|
||||
|
||||
@@ -159,9 +124,6 @@ impl ManagedDirectory {
|
||||
error!("Failed to delete {:?}", file_to_delete);
|
||||
}
|
||||
}
|
||||
DeleteError::FileProtected(_) => {
|
||||
// this is expected.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -171,8 +133,7 @@ impl ManagedDirectory {
|
||||
if !deleted_files.is_empty() {
|
||||
// update the list of managed files by removing
|
||||
// the file that were removed.
|
||||
let mut meta_informations_wlock = self
|
||||
.meta_informations
|
||||
let mut meta_informations_wlock = self.meta_informations
|
||||
.write()
|
||||
.expect("Managed directory wlock poisoned (2).");
|
||||
{
|
||||
@@ -187,29 +148,6 @@ impl ManagedDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
/// Protects a file from being garbage collected.
|
||||
///
|
||||
/// The method returns a `FileProtection` object.
|
||||
/// The file will not be garbage collected as long as the
|
||||
/// `FileProtection` object is kept alive.
|
||||
pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection {
|
||||
let pathbuf = path.to_owned();
|
||||
{
|
||||
let mut meta_informations_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned on protect");
|
||||
*meta_informations_wlock
|
||||
.protected_files
|
||||
.entry(pathbuf.clone())
|
||||
.or_insert(0) += 1;
|
||||
}
|
||||
FileProtection {
|
||||
directory: self.clone(),
|
||||
path: pathbuf.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a file as managed
|
||||
///
|
||||
/// This method must be called before the file is
|
||||
@@ -218,8 +156,7 @@ impl ManagedDirectory {
|
||||
/// will not lead to garbage files that will
|
||||
/// never get removed.
|
||||
fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> {
|
||||
let mut meta_wlock = self
|
||||
.meta_informations
|
||||
let mut meta_wlock = self.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned");
|
||||
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
|
||||
@@ -251,17 +188,6 @@ impl Directory for ManagedDirectory {
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
{
|
||||
let metas_rlock = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("poisoned lock in managed directory meta");
|
||||
if let Some(counter) = metas_rlock.protected_files.get(path) {
|
||||
if *counter > 0 {
|
||||
return Err(DeleteError::FileProtected(path.to_owned()));
|
||||
}
|
||||
}
|
||||
}
|
||||
self.directory.delete(path)
|
||||
}
|
||||
|
||||
@@ -377,28 +303,4 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "mmap")]
|
||||
fn test_managed_directory_protect() {
|
||||
let tempdir = TempDir::new("index").unwrap();
|
||||
let tempdir_path = PathBuf::from(tempdir.path());
|
||||
let living_files = HashSet::new();
|
||||
|
||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||
let mut managed_directory = ManagedDirectory::new(mmap_directory).unwrap();
|
||||
managed_directory
|
||||
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
|
||||
.unwrap();
|
||||
assert!(managed_directory.exists(*TEST_PATH1));
|
||||
|
||||
{
|
||||
let _file_protection = managed_directory.protect_file_from_delete(*TEST_PATH1);
|
||||
managed_directory.garbage_collect(|| living_files.clone());
|
||||
assert!(managed_directory.exists(*TEST_PATH1));
|
||||
}
|
||||
|
||||
managed_directory.garbage_collect(|| living_files.clone());
|
||||
assert!(!managed_directory.exists(*TEST_PATH1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -32,8 +32,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
|
||||
}
|
||||
})?;
|
||||
|
||||
let meta_data = file
|
||||
.metadata()
|
||||
let meta_data = file.metadata()
|
||||
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
||||
if meta_data.len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
@@ -310,8 +309,7 @@ impl Directory for MmapDirectory {
|
||||
// when the last reference is gone.
|
||||
mmap_cache.cache.remove(&full_path);
|
||||
match fs::remove_file(&full_path) {
|
||||
Ok(_) => self
|
||||
.sync_directory()
|
||||
Ok(_) => self.sync_directory()
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e).into()),
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
|
||||
@@ -25,8 +25,7 @@ pub use self::read_only_source::ReadOnlySource;
|
||||
#[cfg(feature = "mmap")]
|
||||
pub use self::mmap_directory::MmapDirectory;
|
||||
|
||||
pub(crate) use self::managed_directory::{FileProtection, ManagedDirectory};
|
||||
pub(crate) use self::read_only_source::SourceRead;
|
||||
pub(crate) use self::managed_directory::ManagedDirectory;
|
||||
|
||||
/// Synonym of Seek + Write
|
||||
pub trait SeekableWrite: Seek + Write {}
|
||||
|
||||
@@ -170,8 +170,7 @@ impl Directory for RAMDirectory {
|
||||
let path_buf = PathBuf::from(path);
|
||||
let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
|
||||
|
||||
let exists = self
|
||||
.fs
|
||||
let exists = self.fs
|
||||
.write(path_buf.clone(), &Vec::new())
|
||||
.map_err(|err| IOError::with_path(path.to_owned(), err))?;
|
||||
|
||||
|
||||
@@ -3,9 +3,8 @@ use common::HasLen;
|
||||
#[cfg(feature = "mmap")]
|
||||
use fst::raw::MmapReadOnly;
|
||||
use stable_deref_trait::{CloneStableDeref, StableDeref};
|
||||
use std::io::{self, Read};
|
||||
use std::ops::Deref;
|
||||
use std::slice;
|
||||
|
||||
|
||||
/// Read object that represents files in tantivy.
|
||||
///
|
||||
@@ -120,49 +119,3 @@ impl From<Vec<u8>> for ReadOnlySource {
|
||||
ReadOnlySource::Anonymous(shared_data)
|
||||
}
|
||||
}
|
||||
|
||||
/// Acts as a owning cursor over the data backed up by a `ReadOnlySource`
|
||||
pub(crate) struct SourceRead {
|
||||
_data_owner: ReadOnlySource,
|
||||
cursor: &'static [u8],
|
||||
}
|
||||
|
||||
impl SourceRead {
|
||||
// Advance the cursor by a given number of bytes.
|
||||
pub fn advance(&mut self, len: usize) {
|
||||
self.cursor = &self.cursor[len..];
|
||||
}
|
||||
|
||||
pub fn slice_from(&self, start: usize) -> &[u8] {
|
||||
&self.cursor[start..]
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> u8 {
|
||||
self.cursor[idx]
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for SourceRead {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.cursor
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ReadOnlySource> for SourceRead {
|
||||
// Creates a new `SourceRead` from a given `ReadOnlySource`
|
||||
fn from(source: ReadOnlySource) -> SourceRead {
|
||||
let len = source.len();
|
||||
let slice_ptr = source.as_slice().as_ptr();
|
||||
let static_slice = unsafe { slice::from_raw_parts(slice_ptr, len) };
|
||||
SourceRead {
|
||||
_data_owner: source,
|
||||
cursor: static_slice,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for SourceRead {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.cursor.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,8 +41,7 @@ pub struct DeleteBitSet {
|
||||
impl DeleteBitSet {
|
||||
/// Opens a delete bitset given its data source.
|
||||
pub fn open(data: ReadOnlySource) -> DeleteBitSet {
|
||||
let num_deleted: usize = data
|
||||
.as_slice()
|
||||
let num_deleted: usize = data.as_slice()
|
||||
.iter()
|
||||
.map(|b| b.count_ones() as usize)
|
||||
.sum();
|
||||
|
||||
@@ -56,8 +56,7 @@ impl FacetReader {
|
||||
|
||||
/// Given a term ordinal returns the term associated to it.
|
||||
pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) {
|
||||
let found_term = self
|
||||
.term_dict
|
||||
let found_term = self.term_dict
|
||||
.ord_to_term(facet_ord as u64, output.inner_buffer_mut());
|
||||
assert!(found_term, "Term ordinal {} no found.", facet_ord);
|
||||
}
|
||||
|
||||
@@ -52,8 +52,7 @@ impl DeleteQueue {
|
||||
//
|
||||
// Past delete operations are not accessible.
|
||||
pub fn cursor(&self) -> DeleteCursor {
|
||||
let last_block = self
|
||||
.inner
|
||||
let last_block = self.inner
|
||||
.read()
|
||||
.expect("Read lock poisoned when opening delete queue cursor")
|
||||
.last_block
|
||||
@@ -93,8 +92,7 @@ impl DeleteQueue {
|
||||
// be some unflushed operations.
|
||||
//
|
||||
fn flush(&self) -> Option<Arc<Block>> {
|
||||
let mut self_wlock = self
|
||||
.inner
|
||||
let mut self_wlock = self.inner
|
||||
.write()
|
||||
.expect("Failed to acquire write lock on delete queue writer");
|
||||
|
||||
@@ -134,8 +132,7 @@ impl From<DeleteQueue> for NextBlock {
|
||||
impl NextBlock {
|
||||
fn next_block(&self) -> Option<Arc<Block>> {
|
||||
{
|
||||
let next_read_lock = self
|
||||
.0
|
||||
let next_read_lock = self.0
|
||||
.read()
|
||||
.expect("Failed to acquire write lock in delete queue");
|
||||
if let InnerNextBlock::Closed(ref block) = *next_read_lock {
|
||||
@@ -144,8 +141,7 @@ impl NextBlock {
|
||||
}
|
||||
let next_block;
|
||||
{
|
||||
let mut next_write_lock = self
|
||||
.0
|
||||
let mut next_write_lock = self.0
|
||||
.write()
|
||||
.expect("Failed to acquire write lock in delete queue");
|
||||
match *next_write_lock {
|
||||
|
||||
@@ -9,7 +9,6 @@ use core::SegmentComponent;
|
||||
use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use core::SegmentReader;
|
||||
use directory::FileProtection;
|
||||
use docset::DocSet;
|
||||
use error::{Error, ErrorKind, Result, ResultExt};
|
||||
use fastfield::write_delete_bitset;
|
||||
@@ -216,15 +215,13 @@ pub fn advance_deletes(
|
||||
mut segment: Segment,
|
||||
segment_entry: &mut SegmentEntry,
|
||||
target_opstamp: u64,
|
||||
) -> Result<Option<FileProtection>> {
|
||||
let mut file_protect: Option<FileProtection> = None;
|
||||
) -> Result<()> {
|
||||
{
|
||||
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
|
||||
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
|
||||
// We are already up-to-date here.
|
||||
if target_opstamp == previous_opstamp {
|
||||
return Ok(file_protect);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let segment_reader = SegmentReader::open(&segment)?;
|
||||
let max_doc = segment_reader.max_doc();
|
||||
|
||||
@@ -243,6 +240,7 @@ pub fn advance_deletes(
|
||||
target_opstamp,
|
||||
)?;
|
||||
|
||||
// TODO optimize
|
||||
for doc in 0u32..max_doc {
|
||||
if segment_reader.is_deleted(doc) {
|
||||
delete_bitset.insert(doc as usize);
|
||||
@@ -251,14 +249,13 @@ pub fn advance_deletes(
|
||||
|
||||
let num_deleted_docs = delete_bitset.len();
|
||||
if num_deleted_docs > 0 {
|
||||
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE));
|
||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
||||
}
|
||||
}
|
||||
segment_entry.set_meta(segment.meta().clone());
|
||||
Ok(file_protect)
|
||||
segment_entry.set_meta((*segment.meta()).clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn index_documents(
|
||||
@@ -299,8 +296,7 @@ fn index_documents(
|
||||
|
||||
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
|
||||
|
||||
let mut segment_meta = SegmentMeta::new(segment_id);
|
||||
segment_meta.set_max_doc(num_docs);
|
||||
let segment_meta = SegmentMeta::new(segment_id, num_docs);
|
||||
|
||||
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
|
||||
|
||||
@@ -342,8 +338,7 @@ impl IndexWriter {
|
||||
}
|
||||
drop(self.workers_join_handle);
|
||||
|
||||
let result = self
|
||||
.segment_updater
|
||||
let result = self.segment_updater
|
||||
.wait_merging_thread()
|
||||
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
|
||||
|
||||
@@ -448,7 +443,9 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
/// Merges a given list of segments
|
||||
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
|
||||
///
|
||||
/// `segment_ids` is required to be non-empty.
|
||||
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||
self.segment_updater.start_merge(segment_ids)
|
||||
}
|
||||
|
||||
@@ -488,8 +485,7 @@ impl IndexWriter {
|
||||
let document_receiver = self.document_receiver.clone();
|
||||
|
||||
// take the directory lock to create a new index_writer.
|
||||
let directory_lock = self
|
||||
._directory_lock
|
||||
let directory_lock = self._directory_lock
|
||||
.take()
|
||||
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||
|
||||
|
||||
@@ -116,15 +116,17 @@ mod tests {
|
||||
assert!(result_list.is_empty());
|
||||
}
|
||||
|
||||
fn seg_meta(num_docs: u32) -> SegmentMeta {
|
||||
let mut segment_metas = SegmentMeta::new(SegmentId::generate_random());
|
||||
segment_metas.set_max_doc(num_docs);
|
||||
segment_metas
|
||||
fn create_random_segment_meta(num_docs: u32) -> SegmentMeta {
|
||||
SegmentMeta::new(SegmentId::generate_random(), num_docs)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_merge_policy_pair() {
|
||||
let test_input = vec![seg_meta(10), seg_meta(10), seg_meta(10)];
|
||||
let test_input = vec![
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 1);
|
||||
}
|
||||
@@ -137,17 +139,17 @@ mod tests {
|
||||
// * one with the 3 * 1000-docs segments
|
||||
// no MergeCandidate expected for the 2 * 10_000-docs segments as min_merge_size=3
|
||||
let test_input = vec![
|
||||
seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000),
|
||||
seg_meta(10000),
|
||||
seg_meta(10000),
|
||||
seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(1000),
|
||||
create_random_segment_meta(1000),
|
||||
create_random_segment_meta(1000),
|
||||
create_random_segment_meta(10000),
|
||||
create_random_segment_meta(10000),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
create_random_segment_meta(10),
|
||||
];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 2);
|
||||
@@ -157,12 +159,12 @@ mod tests {
|
||||
fn test_log_merge_policy_within_levels() {
|
||||
// multiple levels all get merged correctly
|
||||
let test_input = vec![
|
||||
seg_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
|
||||
seg_meta(11), // log2(11) = ~3.46
|
||||
seg_meta(12), // log2(12) = ~3.58
|
||||
seg_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
|
||||
seg_meta(1000), // log2(1000) = ~9.97
|
||||
seg_meta(1000),
|
||||
create_random_segment_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75)
|
||||
create_random_segment_meta(11), // log2(11) = ~3.46
|
||||
create_random_segment_meta(12), // log2(12) = ~3.58
|
||||
create_random_segment_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75)
|
||||
create_random_segment_meta(1000), // log2(1000) = ~9.97
|
||||
create_random_segment_meta(1000),
|
||||
]; // log2(1000) = ~9.97
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 2);
|
||||
@@ -171,12 +173,12 @@ mod tests {
|
||||
fn test_log_merge_policy_small_segments() {
|
||||
// segments under min_layer_size are merged together
|
||||
let test_input = vec![
|
||||
seg_meta(1),
|
||||
seg_meta(1),
|
||||
seg_meta(1),
|
||||
seg_meta(2),
|
||||
seg_meta(2),
|
||||
seg_meta(2),
|
||||
create_random_segment_meta(1),
|
||||
create_random_segment_meta(1),
|
||||
create_random_segment_meta(1),
|
||||
create_random_segment_meta(2),
|
||||
create_random_segment_meta(2),
|
||||
create_random_segment_meta(2),
|
||||
];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 1);
|
||||
|
||||
@@ -440,8 +440,7 @@ impl IndexMerger {
|
||||
) -> Result<Option<TermOrdinalMapping>> {
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
let field_readers = self
|
||||
.readers
|
||||
let field_readers = self.readers
|
||||
.iter()
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.collect::<Vec<_>>();
|
||||
@@ -737,6 +736,7 @@ mod tests {
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index_writer.wait_merging_threads().unwrap();
|
||||
@@ -980,6 +980,7 @@ mod tests {
|
||||
.expect("Searchable segments failed.");
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index.load_searchers().unwrap();
|
||||
@@ -1076,6 +1077,7 @@ mod tests {
|
||||
.expect("Searchable segments failed.");
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index.load_searchers().unwrap();
|
||||
@@ -1129,6 +1131,7 @@ mod tests {
|
||||
.expect("Searchable segments failed.");
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index.load_searchers().unwrap();
|
||||
@@ -1219,6 +1222,7 @@ mod tests {
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index_writer.wait_merging_threads().unwrap();
|
||||
@@ -1290,6 +1294,7 @@ mod tests {
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index_writer.wait_merging_threads().unwrap();
|
||||
@@ -1392,6 +1397,7 @@ mod tests {
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
index_writer
|
||||
.merge(&segment_ids)
|
||||
.expect("Failed to initiate merge")
|
||||
.wait()
|
||||
.expect("Merging failed");
|
||||
index_writer.wait_merging_threads().unwrap();
|
||||
|
||||
@@ -2,6 +2,8 @@ use super::segment_register::SegmentRegister;
|
||||
use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use core::{LOCKFILE_FILEPATH, META_FILEPATH};
|
||||
use error::ErrorKind;
|
||||
use error::Result as TantivyResult;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
use indexer::SegmentEntry;
|
||||
use std::collections::hash_set::HashSet;
|
||||
@@ -64,8 +66,9 @@ impl SegmentManager {
|
||||
|
||||
/// Returns all of the segment entries (committed or uncommitted)
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
let mut segment_entries = self.read().uncommitted.segment_entries();
|
||||
segment_entries.extend(self.read().committed.segment_entries());
|
||||
let registers_lock = self.read();
|
||||
let mut segment_entries = registers_lock.uncommitted.segment_entries();
|
||||
segment_entries.extend(registers_lock.committed.segment_entries());
|
||||
segment_entries
|
||||
}
|
||||
|
||||
@@ -76,32 +79,15 @@ impl SegmentManager {
|
||||
}
|
||||
|
||||
pub fn list_files(&self) -> HashSet<PathBuf> {
|
||||
let registers_lock = self.read();
|
||||
let mut files = HashSet::new();
|
||||
files.insert(META_FILEPATH.clone());
|
||||
files.insert(LOCKFILE_FILEPATH.clone());
|
||||
|
||||
let segment_metas: Vec<SegmentMeta> = registers_lock
|
||||
.committed
|
||||
.get_all_segments()
|
||||
.into_iter()
|
||||
.chain(registers_lock.uncommitted.get_all_segments().into_iter())
|
||||
.chain(registers_lock.writing.iter().cloned().map(SegmentMeta::new))
|
||||
.collect();
|
||||
for segment_meta in segment_metas {
|
||||
for segment_meta in SegmentMeta::all() {
|
||||
files.extend(segment_meta.list_files());
|
||||
}
|
||||
files
|
||||
}
|
||||
|
||||
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
let registers = self.read();
|
||||
registers
|
||||
.committed
|
||||
.segment_entry(segment_id)
|
||||
.or_else(|| registers.uncommitted.segment_entry(segment_id))
|
||||
}
|
||||
|
||||
// Lock poisoning should never happen :
|
||||
// The lock is acquired and released within this class,
|
||||
// and the operations cannot panic.
|
||||
@@ -126,19 +112,38 @@ impl SegmentManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) {
|
||||
/// Marks a list of segments as in merge.
|
||||
///
|
||||
/// Returns an error if some segments are missing, or if
|
||||
/// the `segment_ids` are not either all committed or all
|
||||
/// uncommitted.
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
|
||||
let mut registers_lock = self.write();
|
||||
let mut segment_entries = vec![];
|
||||
if registers_lock.uncommitted.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
registers_lock.uncommitted.start_merge(segment_id);
|
||||
let segment_entry = registers_lock.uncommitted
|
||||
.start_merge(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
} else if registers_lock.committed.contains_all(segment_ids) {
|
||||
for segment_id in segment_ids {
|
||||
let segment_entry = registers_lock.committed
|
||||
.start_merge(segment_id)
|
||||
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
|
||||
segment_entries.push(segment_entry);
|
||||
}
|
||||
for segment_id in segment_ids {
|
||||
registers_lock.committed.start_merge(segment_id);
|
||||
}
|
||||
} else {
|
||||
error!("Merge operation sent for segments that are not all uncommited or commited.");
|
||||
let error_msg = "Merge operation sent for segments that are not \
|
||||
all uncommited or commited."
|
||||
.to_string();
|
||||
bail!(ErrorKind::InvalidArgument(error_msg))
|
||||
}
|
||||
Ok(segment_entries)
|
||||
}
|
||||
|
||||
pub fn cancel_merge(
|
||||
|
||||
@@ -3,8 +3,7 @@ use core::SegmentMeta;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
use indexer::segment_entry::SegmentEntry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
|
||||
/// The segment register keeps track
|
||||
/// of the list of segment, their size as well
|
||||
@@ -39,13 +38,6 @@ impl SegmentRegister {
|
||||
self.segment_states.len()
|
||||
}
|
||||
|
||||
pub fn get_all_segments(&self) -> Vec<SegmentMeta> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_mergeable_segments(&self) -> Vec<SegmentMeta> {
|
||||
self.segment_states
|
||||
.values()
|
||||
@@ -59,8 +51,7 @@ impl SegmentRegister {
|
||||
}
|
||||
|
||||
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
|
||||
let mut segment_ids: Vec<SegmentMeta> = self
|
||||
.segment_states
|
||||
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect();
|
||||
@@ -68,10 +59,6 @@ impl SegmentRegister {
|
||||
segment_ids
|
||||
}
|
||||
|
||||
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
|
||||
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
|
||||
segment_ids
|
||||
.iter()
|
||||
@@ -94,11 +81,13 @@ impl SegmentRegister {
|
||||
.cancel_merge();
|
||||
}
|
||||
|
||||
pub fn start_merge(&mut self, segment_id: &SegmentId) {
|
||||
self.segment_states
|
||||
.get_mut(segment_id)
|
||||
.expect("Received a merge notification for a segment that is not registered")
|
||||
.start_merge();
|
||||
pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
if let Some(segment_entry) = self.segment_states.get_mut(segment_id) {
|
||||
segment_entry.start_merge();
|
||||
Some(segment_entry.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
@@ -110,6 +99,11 @@ impl SegmentRegister {
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -138,7 +132,7 @@ mod tests {
|
||||
let segment_id_merged = SegmentId::generate_random();
|
||||
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_a);
|
||||
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
@@ -151,7 +145,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_b);
|
||||
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
@@ -181,7 +175,7 @@ mod tests {
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ use core::SegmentMeta;
|
||||
use core::SerializableSegment;
|
||||
use core::META_FILEPATH;
|
||||
use directory::Directory;
|
||||
use directory::FileProtection;
|
||||
use error::{Error, ErrorKind, Result};
|
||||
use error::{Error, ErrorKind, Result, ResultExt};
|
||||
use futures::oneshot;
|
||||
use futures::sync::oneshot::Receiver;
|
||||
use futures::Future;
|
||||
use futures_cpupool::Builder as CpuPoolBuilder;
|
||||
use futures_cpupool::CpuFuture;
|
||||
use futures_cpupool::CpuPool;
|
||||
use indexer::delete_queue::DeleteCursor;
|
||||
@@ -29,8 +29,7 @@ use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::mem;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::thread;
|
||||
@@ -87,38 +86,19 @@ pub fn save_metas(
|
||||
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
|
||||
|
||||
fn perform_merge(
|
||||
segment_ids: &[SegmentId],
|
||||
segment_updater: &SegmentUpdater,
|
||||
index: &Index,
|
||||
mut segment_entries: Vec<SegmentEntry>,
|
||||
mut merged_segment: Segment,
|
||||
target_opstamp: u64,
|
||||
) -> Result<SegmentEntry> {
|
||||
// first we need to apply deletes to our segment.
|
||||
info!("Start merge: {:?}", segment_ids);
|
||||
|
||||
let index = &segment_updater.0.index;
|
||||
// TODO add logging
|
||||
let schema = index.schema();
|
||||
let mut segment_entries = vec![];
|
||||
|
||||
let mut file_protections: Vec<FileProtection> = vec![];
|
||||
|
||||
for segment_id in segment_ids {
|
||||
if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id)
|
||||
{
|
||||
let segment = index.segment(segment_entry.meta().clone());
|
||||
if let Some(file_protection) =
|
||||
advance_deletes(segment, &mut segment_entry, target_opstamp)?
|
||||
{
|
||||
file_protections.push(file_protection);
|
||||
}
|
||||
segment_entries.push(segment_entry);
|
||||
} else {
|
||||
error!("Error, had to abort merge as some of the segment is not managed anymore.");
|
||||
let msg = format!(
|
||||
"Segment {:?} requested for merge is not managed.",
|
||||
segment_id
|
||||
);
|
||||
bail!(ErrorKind::InvalidArgument(msg));
|
||||
}
|
||||
for segment_entry in &mut segment_entries {
|
||||
let segment = index.segment(segment_entry.meta().clone());
|
||||
advance_deletes(segment, segment_entry, target_opstamp)?;
|
||||
}
|
||||
|
||||
let delete_cursor = segment_entries[0].delete_cursor().clone();
|
||||
@@ -135,13 +115,13 @@ fn perform_merge(
|
||||
// to merge the two segments.
|
||||
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)
|
||||
.expect("Creating index serializer failed");
|
||||
.chain_err(|| "Creating index serializer failed")?;
|
||||
|
||||
let num_docs = merger
|
||||
.write(segment_serializer)
|
||||
.expect("Serializing merged index failed");
|
||||
let mut segment_meta = SegmentMeta::new(merged_segment.id());
|
||||
segment_meta.set_max_doc(num_docs);
|
||||
.chain_err(|| "Serializing merged index failed")?;
|
||||
|
||||
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
|
||||
|
||||
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
|
||||
Ok(after_merge_segment_entry)
|
||||
@@ -167,8 +147,12 @@ impl SegmentUpdater {
|
||||
) -> Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let pool = CpuPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
.create();
|
||||
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
|
||||
pool: CpuPool::new(1),
|
||||
pool,
|
||||
index,
|
||||
segment_manager,
|
||||
merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())),
|
||||
@@ -283,69 +267,85 @@ impl SegmentUpdater {
|
||||
}).wait()
|
||||
}
|
||||
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Receiver<SegmentMeta> {
|
||||
self.0.segment_manager.start_merge(segment_ids);
|
||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||
//let future_merged_segment = */
|
||||
let segment_ids_vec = segment_ids.to_vec();
|
||||
self.run_async(move |segment_updater| {
|
||||
segment_updater.start_merge_impl(&segment_ids_vec[..])
|
||||
}).wait()?
|
||||
}
|
||||
|
||||
// `segment_ids` is required to be non-empty.
|
||||
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
|
||||
|
||||
let segment_updater_clone = self.clone();
|
||||
let segment_entries: Vec<SegmentEntry> = self.0.segment_manager.start_merge(segment_ids)?;
|
||||
|
||||
let segment_ids_vec = segment_ids.to_vec();
|
||||
|
||||
let merging_thread_id = self.get_merging_thread_id();
|
||||
info!(
|
||||
"Starting merge thread #{} - {:?}",
|
||||
merging_thread_id, segment_ids
|
||||
);
|
||||
let (merging_future_send, merging_future_recv) = oneshot();
|
||||
|
||||
if segment_ids.is_empty() {
|
||||
return merging_future_recv;
|
||||
}
|
||||
|
||||
let target_opstamp = self.0.stamper.stamp();
|
||||
let merging_join_handle = thread::spawn(move || {
|
||||
// first we need to apply deletes to our segment.
|
||||
let merged_segment = segment_updater_clone.new_segment();
|
||||
let merged_segment_id = merged_segment.id();
|
||||
let merge_result = perform_merge(
|
||||
&segment_ids_vec,
|
||||
&segment_updater_clone,
|
||||
merged_segment,
|
||||
target_opstamp,
|
||||
);
|
||||
|
||||
match merge_result {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
let merged_segment_meta = after_merge_segment_entry.meta().clone();
|
||||
segment_updater_clone
|
||||
.end_merge(segment_ids_vec, after_merge_segment_entry)
|
||||
.expect("Segment updater thread is corrupted.");
|
||||
// first we need to apply deletes to our segment.
|
||||
let merging_join_handle = thread::Builder::new()
|
||||
.name(format!("mergingthread-{}", merging_thread_id))
|
||||
.spawn(move || {
|
||||
// first we need to apply deletes to our segment.
|
||||
let merged_segment = segment_updater_clone.new_segment();
|
||||
let merged_segment_id = merged_segment.id();
|
||||
let merge_result = perform_merge(
|
||||
&segment_updater_clone.0.index,
|
||||
segment_entries,
|
||||
merged_segment,
|
||||
target_opstamp,
|
||||
);
|
||||
|
||||
// the future may fail if the listener of the oneshot future
|
||||
// has been destroyed.
|
||||
//
|
||||
// This is not a problem here, so we just ignore any
|
||||
// possible error.
|
||||
let _merging_future_res = merging_future_send.send(merged_segment_meta);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
match merge_result {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
let merged_segment_meta = after_merge_segment_entry.meta().clone();
|
||||
segment_updater_clone
|
||||
.end_merge(segment_ids_vec, after_merge_segment_entry)
|
||||
.expect("Segment updater thread is corrupted.");
|
||||
|
||||
// the future may fail if the listener of the oneshot future
|
||||
// has been destroyed.
|
||||
//
|
||||
// This is not a problem here, so we just ignore any
|
||||
// possible error.
|
||||
let _merging_future_res = merging_future_send.send(merged_segment_meta);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
}
|
||||
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
|
||||
// merging_future_send will be dropped, sending an error to the future.
|
||||
}
|
||||
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
|
||||
// merging_future_send will be dropped, sending an error to the future.
|
||||
}
|
||||
}
|
||||
segment_updater_clone
|
||||
.0
|
||||
.merging_threads
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove(&merging_thread_id);
|
||||
Ok(())
|
||||
});
|
||||
segment_updater_clone
|
||||
.0
|
||||
.merging_threads
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove(&merging_thread_id);
|
||||
Ok(())
|
||||
})
|
||||
.expect("Failed to spawn a thread.");
|
||||
self.0
|
||||
.merging_threads
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(merging_thread_id, merging_join_handle);
|
||||
merging_future_recv
|
||||
Ok(merging_future_recv)
|
||||
}
|
||||
|
||||
fn consider_merge_options(&self) {
|
||||
@@ -358,8 +358,18 @@ impl SegmentUpdater {
|
||||
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
|
||||
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
|
||||
for MergeCandidate(segment_metas) in merge_candidates {
|
||||
if let Err(e) = self.start_merge(&segment_metas).fuse().poll() {
|
||||
error!("The merge task failed quickly after starting: {:?}", e);
|
||||
match self.start_merge_impl(&segment_metas) {
|
||||
Ok(merge_future) => {
|
||||
if let Err(e) = merge_future.fuse().poll() {
|
||||
error!("The merge task failed quickly after starting: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Starting the merge failed for the following reason. This is not fatal. {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -382,7 +392,6 @@ impl SegmentUpdater {
|
||||
self.run_async(move |segment_updater| {
|
||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
let mut _file_protection_opt = None;
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
let committed_opstamp = segment_updater
|
||||
.0
|
||||
@@ -393,29 +402,22 @@ impl SegmentUpdater {
|
||||
if delete_operation.opstamp < committed_opstamp {
|
||||
let index = &segment_updater.0.index;
|
||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||
match advance_deletes(
|
||||
segment,
|
||||
&mut after_merge_segment_entry,
|
||||
committed_opstamp,
|
||||
) {
|
||||
Ok(file_protection_opt_res) => {
|
||||
_file_protection_opt = file_protection_opt_res;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
before_merge_segment_ids, e
|
||||
);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
}
|
||||
segment_updater.cancel_merge(
|
||||
&before_merge_segment_ids,
|
||||
after_merge_segment_entry.segment_id(),
|
||||
);
|
||||
return;
|
||||
if let Err(e) =
|
||||
advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp)
|
||||
{
|
||||
error!(
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
before_merge_segment_ids, e
|
||||
);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
}
|
||||
segment_updater.cancel_merge(
|
||||
&before_merge_segment_ids,
|
||||
after_merge_segment_entry.segment_id(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,6 +180,9 @@ mod macros;
|
||||
|
||||
pub use error::{Error, ErrorKind, ResultExt};
|
||||
|
||||
extern crate census;
|
||||
extern crate owned_read;
|
||||
|
||||
/// Tantivy result.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
|
||||
@@ -94,8 +94,7 @@ impl MultiFieldPostingsWriter {
|
||||
&self,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
) -> Result<HashMap<Field, HashMap<UnorderedTermId, TermOrdinal>>> {
|
||||
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self
|
||||
.term_index
|
||||
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index
|
||||
.iter()
|
||||
.map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId))
|
||||
.collect();
|
||||
|
||||
@@ -107,8 +107,7 @@ impl Recorder for TermFrequencyRecorder {
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
// the last document has not been closed...
|
||||
// its term freq is self.current_tf.
|
||||
let mut doc_iter = self
|
||||
.stack
|
||||
let mut doc_iter = self.stack
|
||||
.iter(heap)
|
||||
.chain(Some(self.current_tf).into_iter());
|
||||
|
||||
|
||||
@@ -2,15 +2,14 @@ use compression::{BlockDecoder, CompressedIntStream, VIntDecoder, COMPRESSION_BL
|
||||
use DocId;
|
||||
|
||||
use common::BitSet;
|
||||
use common::CountingWriter;
|
||||
use common::HasLen;
|
||||
use compression::compressed_block_size;
|
||||
use directory::{ReadOnlySource, SourceRead};
|
||||
use docset::{DocSet, SkipResult};
|
||||
use fst::Streamer;
|
||||
use postings::serializer::PostingsSerializer;
|
||||
use postings::FreqReadingOption;
|
||||
use postings::Postings;
|
||||
use owned_read::OwnedRead;
|
||||
|
||||
struct PositionComputer {
|
||||
// store the amount of position int
|
||||
@@ -78,9 +77,9 @@ impl SegmentPostings {
|
||||
/// and returns a `SegmentPostings` object that embeds a
|
||||
/// buffer with the serialized data.
|
||||
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
|
||||
let mut counting_writer = CountingWriter::wrap(Vec::new());
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut postings_serializer = PostingsSerializer::new(&mut counting_writer, false);
|
||||
let mut postings_serializer = PostingsSerializer::new(&mut buffer, false);
|
||||
for &doc in docs {
|
||||
postings_serializer.write_doc(doc, 1u32).unwrap();
|
||||
}
|
||||
@@ -88,13 +87,9 @@ impl SegmentPostings {
|
||||
.close_term()
|
||||
.expect("In memory Serialization should never fail.");
|
||||
}
|
||||
let (buffer, _) = counting_writer
|
||||
.finish()
|
||||
.expect("Serializing in a buffer should never fail.");
|
||||
let data = ReadOnlySource::from(buffer);
|
||||
let block_segment_postings = BlockSegmentPostings::from_data(
|
||||
docs.len(),
|
||||
SourceRead::from(data),
|
||||
OwnedRead::new(buffer),
|
||||
FreqReadingOption::NoFreq,
|
||||
);
|
||||
SegmentPostings::from_block_postings(block_segment_postings, None)
|
||||
@@ -306,13 +301,13 @@ pub struct BlockSegmentPostings {
|
||||
doc_offset: DocId,
|
||||
num_bitpacked_blocks: usize,
|
||||
num_vint_docs: usize,
|
||||
remaining_data: SourceRead,
|
||||
remaining_data: OwnedRead,
|
||||
}
|
||||
|
||||
impl BlockSegmentPostings {
|
||||
pub(crate) fn from_data(
|
||||
doc_freq: usize,
|
||||
data: SourceRead,
|
||||
data: OwnedRead,
|
||||
freq_reading_option: FreqReadingOption,
|
||||
) -> BlockSegmentPostings {
|
||||
let num_bitpacked_blocks: usize = (doc_freq as usize) / COMPRESSION_BLOCK_SIZE;
|
||||
@@ -339,7 +334,7 @@ impl BlockSegmentPostings {
|
||||
// # Warning
|
||||
//
|
||||
// This does not reset the positions list.
|
||||
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: SourceRead) {
|
||||
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: OwnedRead) {
|
||||
let num_binpacked_blocks: usize = doc_freq / COMPRESSION_BLOCK_SIZE;
|
||||
let num_vint_docs = doc_freq & (COMPRESSION_BLOCK_SIZE - 1);
|
||||
self.num_bitpacked_blocks = num_binpacked_blocks;
|
||||
@@ -399,8 +394,7 @@ impl BlockSegmentPostings {
|
||||
/// Returns false iff there was no remaining blocks.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
if self.num_bitpacked_blocks > 0 {
|
||||
let num_consumed_bytes = self
|
||||
.doc_decoder
|
||||
let num_consumed_bytes = self.doc_decoder
|
||||
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
|
||||
self.remaining_data.advance(num_consumed_bytes);
|
||||
match self.freq_reading_option {
|
||||
@@ -410,8 +404,7 @@ impl BlockSegmentPostings {
|
||||
self.remaining_data.advance(num_bytes_to_skip);
|
||||
}
|
||||
FreqReadingOption::ReadFreq => {
|
||||
let num_consumed_bytes = self
|
||||
.freq_decoder
|
||||
let num_consumed_bytes = self.freq_decoder
|
||||
.uncompress_block_unsorted(self.remaining_data.as_ref());
|
||||
self.remaining_data.advance(num_consumed_bytes);
|
||||
}
|
||||
@@ -451,7 +444,7 @@ impl BlockSegmentPostings {
|
||||
freq_decoder: BlockDecoder::with_val(1),
|
||||
freq_reading_option: FreqReadingOption::NoFreq,
|
||||
|
||||
remaining_data: From::from(ReadOnlySource::empty()),
|
||||
remaining_data: OwnedRead::new(vec![]),
|
||||
doc_offset: 0,
|
||||
doc_freq: 0,
|
||||
}
|
||||
|
||||
@@ -160,8 +160,7 @@ impl<'a> FieldSerializer<'a> {
|
||||
}
|
||||
|
||||
fn current_term_info(&self) -> TermInfo {
|
||||
let (filepos, offset) = self
|
||||
.positions_serializer_opt
|
||||
let (filepos, offset) = self.positions_serializer_opt
|
||||
.as_ref()
|
||||
.map(|positions_serializer| positions_serializer.addr())
|
||||
.unwrap_or((0u64, 0u8));
|
||||
@@ -240,13 +239,60 @@ impl<'a> FieldSerializer<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
struct Block {
|
||||
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
|
||||
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
|
||||
len: usize
|
||||
}
|
||||
|
||||
impl Block {
|
||||
fn new() -> Self {
|
||||
Block {
|
||||
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
|
||||
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
|
||||
len: 0
|
||||
}
|
||||
}
|
||||
|
||||
fn doc_ids(&self) -> &[DocId] {
|
||||
&self.doc_ids[..self.len]
|
||||
}
|
||||
|
||||
fn term_freqs(&self) -> &[u32] {
|
||||
&self.term_freqs[..self.len]
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.len = 0;
|
||||
}
|
||||
|
||||
fn append_doc(&mut self, doc: DocId, term_freq: u32) {
|
||||
let len = self.len;
|
||||
self.doc_ids[len] = doc;
|
||||
self.term_freqs[len] = term_freq;
|
||||
self.len = len + 1;
|
||||
}
|
||||
|
||||
fn is_full(&self) -> bool {
|
||||
self.len == COMPRESSION_BLOCK_SIZE
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.len == 0
|
||||
}
|
||||
|
||||
fn last_doc(&self) -> DocId {
|
||||
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
|
||||
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostingsSerializer<W: Write> {
|
||||
postings_write: CountingWriter<W>,
|
||||
last_doc_id_encoded: u32,
|
||||
|
||||
block_encoder: BlockEncoder,
|
||||
doc_ids: Vec<DocId>,
|
||||
term_freqs: Vec<u32>,
|
||||
block: Box<Block>,
|
||||
|
||||
termfreq_enabled: bool,
|
||||
}
|
||||
@@ -257,42 +303,41 @@ impl<W: Write> PostingsSerializer<W> {
|
||||
postings_write: CountingWriter::wrap(write),
|
||||
|
||||
block_encoder: BlockEncoder::new(),
|
||||
doc_ids: vec![],
|
||||
term_freqs: vec![],
|
||||
block: Box::new(Block::new()),
|
||||
|
||||
last_doc_id_encoded: 0u32,
|
||||
termfreq_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
|
||||
self.doc_ids.push(doc_id);
|
||||
if self.termfreq_enabled {
|
||||
self.term_freqs.push(term_freq as u32);
|
||||
fn write_block(&mut self) -> io::Result<()> {
|
||||
{
|
||||
// encode the doc ids
|
||||
let block_encoded: &[u8] = self.block_encoder
|
||||
.compress_block_sorted(&self.block.doc_ids(), self.last_doc_id_encoded);
|
||||
self.last_doc_id_encoded = self.block.last_doc();
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
}
|
||||
if self.doc_ids.len() == COMPRESSION_BLOCK_SIZE {
|
||||
{
|
||||
// encode the doc ids
|
||||
let block_encoded: &[u8] = self
|
||||
.block_encoder
|
||||
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
|
||||
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
}
|
||||
if self.termfreq_enabled {
|
||||
// encode the term_freqs
|
||||
let block_encoded: &[u8] =
|
||||
self.block_encoder.compress_block_unsorted(&self.term_freqs);
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
self.term_freqs.clear();
|
||||
}
|
||||
self.doc_ids.clear();
|
||||
if self.termfreq_enabled {
|
||||
// encode the term_freqs
|
||||
let block_encoded: &[u8] =
|
||||
self.block_encoder.compress_block_unsorted(&self.block.term_freqs());
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
}
|
||||
self.block.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
|
||||
self.block.append_doc(doc_id, term_freq);
|
||||
if self.block.is_full() {
|
||||
self.write_block()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn close_term(&mut self) -> io::Result<()> {
|
||||
if !self.doc_ids.is_empty() {
|
||||
if !self.block.is_empty() {
|
||||
// we have doc ids waiting to be written
|
||||
// this happens when the number of doc ids is
|
||||
// not a perfect multiple of our block size.
|
||||
@@ -300,20 +345,17 @@ impl<W: Write> PostingsSerializer<W> {
|
||||
// In that case, the remaining part is encoded
|
||||
// using variable int encoding.
|
||||
{
|
||||
let block_encoded = self
|
||||
.block_encoder
|
||||
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
|
||||
let block_encoded = self.block_encoder
|
||||
.compress_vint_sorted(&self.block.doc_ids(), self.last_doc_id_encoded);
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
self.doc_ids.clear();
|
||||
}
|
||||
// ... Idem for term frequencies
|
||||
if self.termfreq_enabled {
|
||||
let block_encoded = self
|
||||
.block_encoder
|
||||
.compress_vint_unsorted(&self.term_freqs[..]);
|
||||
let block_encoded = self.block_encoder
|
||||
.compress_vint_unsorted(self.block.term_freqs());
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
self.term_freqs.clear();
|
||||
}
|
||||
self.block.clear();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -327,8 +369,7 @@ impl<W: Write> PostingsSerializer<W> {
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.doc_ids.clear();
|
||||
self.term_freqs.clear();
|
||||
self.block.clear();
|
||||
self.last_doc_id_encoded = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,8 +41,7 @@ impl From<Vec<(Occur, Box<Query>)>> for BooleanQuery {
|
||||
|
||||
impl Query for BooleanQuery {
|
||||
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<Weight>> {
|
||||
let sub_weights = self
|
||||
.subqueries
|
||||
let sub_weights = self.subqueries
|
||||
.iter()
|
||||
.map(|&(ref occur, ref subquery)| {
|
||||
Ok((*occur, subquery.weight(searcher, scoring_enabled)?))
|
||||
|
||||
@@ -1,11 +1,21 @@
|
||||
use query::Occur;
|
||||
use schema::Field;
|
||||
use schema::Term;
|
||||
use schema::Type;
|
||||
use std::fmt;
|
||||
use std::ops::Bound;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum LogicalLiteral {
|
||||
Term(Term),
|
||||
Phrase(Vec<Term>),
|
||||
Range {
|
||||
field: Field,
|
||||
value_type: Type,
|
||||
lower: Bound<Term>,
|
||||
upper: Bound<Term>,
|
||||
},
|
||||
All,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -54,6 +64,12 @@ impl fmt::Debug for LogicalLiteral {
|
||||
match *self {
|
||||
LogicalLiteral::Term(ref term) => write!(formatter, "{:?}", term),
|
||||
LogicalLiteral::Phrase(ref terms) => write!(formatter, "\"{:?}\"", terms),
|
||||
LogicalLiteral::Range {
|
||||
ref lower,
|
||||
ref upper,
|
||||
..
|
||||
} => write!(formatter, "({:?} TO {:?})", lower, upper),
|
||||
LogicalLiteral::All => write!(formatter, "*"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +1,37 @@
|
||||
use super::user_input_ast::*;
|
||||
use combine::char::*;
|
||||
use combine::*;
|
||||
use query::query_parser::user_input_ast::UserInputBound;
|
||||
|
||||
fn field<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
|
||||
(
|
||||
letter(),
|
||||
many(satisfy(|c: char| c.is_alphanumeric() || c == '_')),
|
||||
).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
|
||||
}
|
||||
|
||||
fn word<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
|
||||
many1(satisfy(|c: char| c.is_alphanumeric()))
|
||||
}
|
||||
|
||||
fn negative_number<I: Stream<Item = char>>() -> impl Parser<Input = I, Output = String> {
|
||||
(char('-'), many1(satisfy(|c: char| c.is_numeric())))
|
||||
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
|
||||
}
|
||||
|
||||
fn literal<I>(input: I) -> ParseResult<UserInputAST, I>
|
||||
where
|
||||
I: Stream<Item = char>,
|
||||
{
|
||||
let term_val = || {
|
||||
let word = many1(satisfy(|c: char| c.is_alphanumeric()));
|
||||
let phrase = (char('"'), many1(satisfy(|c| c != '"')), char('"')).map(|(_, s, _)| s);
|
||||
phrase.or(word)
|
||||
phrase.or(word())
|
||||
};
|
||||
|
||||
let negative_numbers = (char('-'), many1(satisfy(|c: char| c.is_numeric())))
|
||||
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2));
|
||||
|
||||
let field = (
|
||||
letter(),
|
||||
many(satisfy(|c: char| c.is_alphanumeric() || c == '_')),
|
||||
).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2));
|
||||
|
||||
let term_val_with_field = negative_numbers.or(term_val());
|
||||
let term_val_with_field = negative_number().or(term_val());
|
||||
|
||||
let term_query =
|
||||
(field, char(':'), term_val_with_field).map(|(field_name, _, phrase)| UserInputLiteral {
|
||||
(field(), char(':'), term_val_with_field).map(|(field_name, _, phrase)| UserInputLiteral {
|
||||
field_name: Some(field_name),
|
||||
phrase,
|
||||
});
|
||||
@@ -37,6 +45,36 @@ where
|
||||
.parse_stream(input)
|
||||
}
|
||||
|
||||
fn range<I: Stream<Item = char>>(input: I) -> ParseResult<UserInputAST, I> {
|
||||
let term_val = || {
|
||||
word().or(negative_number()).or(char('*').map(|_| "*".to_string()))
|
||||
};
|
||||
let lower_bound = {
|
||||
let excl = (char('{'), term_val()).map(|(_, w)| UserInputBound::Exclusive(w));
|
||||
let incl = (char('['), term_val()).map(|(_, w)| UserInputBound::Inclusive(w));
|
||||
excl.or(incl)
|
||||
};
|
||||
let upper_bound = {
|
||||
let excl = (term_val(), char('}')).map(|(w, _)| UserInputBound::Exclusive(w));
|
||||
let incl = (term_val(), char(']')).map(|(w, _)| UserInputBound::Inclusive(w));
|
||||
// TODO: this backtracking should be unnecessary
|
||||
try(excl).or(incl)
|
||||
};
|
||||
(
|
||||
optional((field(), char(':')).map(|x| x.0)),
|
||||
lower_bound,
|
||||
spaces(),
|
||||
string("TO"),
|
||||
spaces(),
|
||||
upper_bound,
|
||||
).map(|(field, lower, _, _, _, upper)| UserInputAST::Range {
|
||||
field,
|
||||
lower,
|
||||
upper,
|
||||
})
|
||||
.parse_stream(input)
|
||||
}
|
||||
|
||||
fn leaf<I>(input: I) -> ParseResult<UserInputAST, I>
|
||||
where
|
||||
I: Stream<Item = char>,
|
||||
@@ -45,6 +83,8 @@ where
|
||||
.map(|(_, expr)| UserInputAST::Not(Box::new(expr)))
|
||||
.or((char('+'), parser(leaf)).map(|(_, expr)| UserInputAST::Must(Box::new(expr))))
|
||||
.or((char('('), parser(parse_to_ast), char(')')).map(|(_, expr, _)| expr))
|
||||
.or(char('*').map(|_| UserInputAST::All))
|
||||
.or(try(parser(range)))
|
||||
.or(parser(literal))
|
||||
.parse_stream(input)
|
||||
}
|
||||
@@ -91,6 +131,12 @@ mod test {
|
||||
test_parse_query_to_ast_helper("-abc:toto", "-(abc:\"toto\")");
|
||||
test_parse_query_to_ast_helper("abc:a b", "(abc:\"a\" \"b\")");
|
||||
test_parse_query_to_ast_helper("abc:\"a b\"", "abc:\"a b\"");
|
||||
test_parse_query_to_ast_helper("foo:[1 TO 5]", "foo:[\"1\" TO \"5\"]");
|
||||
test_parse_query_to_ast_helper("[1 TO 5]", "[\"1\" TO \"5\"]");
|
||||
test_parse_query_to_ast_helper("foo:{a TO z}", "foo:{\"a\" TO \"z\"}");
|
||||
test_parse_query_to_ast_helper("foo:[1 TO toto}", "foo:[\"1\" TO \"toto\"}");
|
||||
test_parse_query_to_ast_helper("foo:[* TO toto}", "foo:[\"*\" TO \"toto\"}");
|
||||
test_parse_query_to_ast_helper("foo:[1 TO *}", "foo:[\"1\" TO \"*\"}");
|
||||
test_is_parse_err("abc + ");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,15 +2,19 @@ use super::logical_ast::*;
|
||||
use super::query_grammar::parse_to_ast;
|
||||
use super::user_input_ast::*;
|
||||
use core::Index;
|
||||
use query::AllQuery;
|
||||
use query::BooleanQuery;
|
||||
use query::Occur;
|
||||
use query::PhraseQuery;
|
||||
use query::Query;
|
||||
use query::RangeQuery;
|
||||
use query::TermQuery;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::{Field, Schema};
|
||||
use schema::{FieldType, Term};
|
||||
use std::borrow::Cow;
|
||||
use std::num::ParseIntError;
|
||||
use std::ops::Bound;
|
||||
use std::str::FromStr;
|
||||
use tokenizer::TokenizerManager;
|
||||
|
||||
@@ -39,6 +43,9 @@ pub enum QueryParserError {
|
||||
/// The tokenizer for the given field is unknown
|
||||
/// The two argument strings are the name of the field, the name of the tokenizer
|
||||
UnknownTokenizer(String, String),
|
||||
/// The query contains a range query with a phrase as one of the bounds.
|
||||
/// Only terms can be used as bounds.
|
||||
RangeMustNotHavePhrase,
|
||||
}
|
||||
|
||||
impl From<ParseIntError> for QueryParserError {
|
||||
@@ -66,8 +73,8 @@ impl From<ParseIntError> for QueryParserError {
|
||||
/// by relevance : The user typically just scans through the first few
|
||||
/// documents in order of decreasing relevance and will stop when the documents
|
||||
/// are not relevant anymore.
|
||||
/// Making it possible to make this behavior customizable is tracked in
|
||||
/// [issue #27](https://github.com/fulmicoton/tantivy/issues/27).
|
||||
///
|
||||
/// Switching to a default of `AND` can be done by calling `.set_conjunction_by_default()`.
|
||||
///
|
||||
/// * negative terms: By prepending a term by a `-`, a term can be excluded
|
||||
/// from the search. This is useful for disambiguating a query.
|
||||
@@ -75,6 +82,17 @@ impl From<ParseIntError> for QueryParserError {
|
||||
///
|
||||
/// * must terms: By prepending a term by a `+`, a term can be made required for the search.
|
||||
///
|
||||
/// * phrase terms: Quoted terms become phrase searches on fields that have positions indexed.
|
||||
/// e.g., `title:"Barack Obama"` will only find documents that have "barack" immediately followed
|
||||
/// by "obama".
|
||||
///
|
||||
/// * range terms: Range searches can be done by specifying the start and end bound. These can be
|
||||
/// inclusive or exclusive. e.g., `title:[a TO c}` will find all documents whose title contains
|
||||
/// a word lexicographically between `a` and `c` (inclusive lower bound, exclusive upper bound).
|
||||
/// Inclusive bounds are `[]`, exclusive are `{}`.
|
||||
///
|
||||
/// * all docs query: A plain `*` will match all documents in the index.
|
||||
///
|
||||
pub struct QueryParser {
|
||||
schema: Schema,
|
||||
default_fields: Vec<Field>,
|
||||
@@ -155,11 +173,12 @@ impl QueryParser {
|
||||
}
|
||||
Ok(ast)
|
||||
}
|
||||
fn compute_logical_ast_for_leaf(
|
||||
|
||||
fn compute_terms_for_string(
|
||||
&self,
|
||||
field: Field,
|
||||
phrase: &str,
|
||||
) -> Result<Option<LogicalLiteral>, QueryParserError> {
|
||||
) -> Result<Vec<Term>, QueryParserError> {
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let field_type = field_entry.field_type();
|
||||
if !field_type.is_indexed() {
|
||||
@@ -170,12 +189,12 @@ impl QueryParser {
|
||||
FieldType::I64(_) => {
|
||||
let val: i64 = i64::from_str(phrase)?;
|
||||
let term = Term::from_field_i64(field, val);
|
||||
Ok(Some(LogicalLiteral::Term(term)))
|
||||
Ok(vec![term])
|
||||
}
|
||||
FieldType::U64(_) => {
|
||||
let val: u64 = u64::from_str(phrase)?;
|
||||
let term = Term::from_field_u64(field, val);
|
||||
Ok(Some(LogicalLiteral::Term(term)))
|
||||
Ok(vec![term])
|
||||
}
|
||||
FieldType::Str(ref str_options) => {
|
||||
if let Some(option) = str_options.get_indexing_options() {
|
||||
@@ -194,17 +213,15 @@ impl QueryParser {
|
||||
terms.push(term);
|
||||
});
|
||||
if terms.is_empty() {
|
||||
Ok(None)
|
||||
Ok(vec![])
|
||||
} else if terms.len() == 1 {
|
||||
Ok(Some(LogicalLiteral::Term(
|
||||
terms.into_iter().next().unwrap(),
|
||||
)))
|
||||
Ok(terms)
|
||||
} else {
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let field_type = field_entry.field_type();
|
||||
if let Some(index_record_option) = field_type.get_index_record_option() {
|
||||
if index_record_option.has_positions() {
|
||||
Ok(Some(LogicalLiteral::Phrase(terms)))
|
||||
Ok(terms)
|
||||
} else {
|
||||
let fieldname = self.schema.get_field_name(field).to_string();
|
||||
Err(QueryParserError::FieldDoesNotHavePositionsIndexed(
|
||||
@@ -223,10 +240,7 @@ impl QueryParser {
|
||||
))
|
||||
}
|
||||
}
|
||||
FieldType::HierarchicalFacet => {
|
||||
let term = Term::from_field_text(field, phrase);
|
||||
Ok(Some(LogicalLiteral::Term(term)))
|
||||
}
|
||||
FieldType::HierarchicalFacet => Ok(vec![Term::from_field_text(field, phrase)]),
|
||||
FieldType::Bytes => {
|
||||
let field_name = self.schema.get_field_name(field).to_string();
|
||||
Err(QueryParserError::FieldNotIndexed(field_name))
|
||||
@@ -234,6 +248,21 @@ impl QueryParser {
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_logical_ast_for_leaf(
|
||||
&self,
|
||||
field: Field,
|
||||
phrase: &str,
|
||||
) -> Result<Option<LogicalLiteral>, QueryParserError> {
|
||||
let terms = self.compute_terms_for_string(field, phrase)?;
|
||||
match terms.len() {
|
||||
0 => Ok(None),
|
||||
1 => Ok(Some(LogicalLiteral::Term(
|
||||
terms.into_iter().next().unwrap(),
|
||||
))),
|
||||
_ => Ok(Some(LogicalLiteral::Phrase(terms))),
|
||||
}
|
||||
}
|
||||
|
||||
fn default_occur(&self) -> Occur {
|
||||
if self.conjunction_by_default {
|
||||
Occur::Must
|
||||
@@ -242,6 +271,37 @@ impl QueryParser {
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_bound(&self, field: Field, bound: &UserInputBound) -> Result<Bound<Term>, QueryParserError> {
|
||||
if bound.term_str() == "*" {
|
||||
return Ok(Bound::Unbounded);
|
||||
}
|
||||
let terms = self.compute_terms_for_string(field, bound.term_str())?;
|
||||
if terms.len() != 1 {
|
||||
return Err(QueryParserError::RangeMustNotHavePhrase);
|
||||
}
|
||||
let term = terms.into_iter().next().unwrap();
|
||||
match *bound {
|
||||
UserInputBound::Inclusive(_) => Ok(Bound::Included(term)),
|
||||
UserInputBound::Exclusive(_) => Ok(Bound::Excluded(term)),
|
||||
}
|
||||
}
|
||||
|
||||
fn resolved_fields(
|
||||
&self,
|
||||
given_field: &Option<String>,
|
||||
) -> Result<Cow<[Field]>, QueryParserError> {
|
||||
match *given_field {
|
||||
None => {
|
||||
if self.default_fields.is_empty() {
|
||||
Err(QueryParserError::NoDefaultFieldDeclared)
|
||||
} else {
|
||||
Ok(Cow::from(&self.default_fields[..]))
|
||||
}
|
||||
}
|
||||
Some(ref field) => Ok(Cow::from(vec![self.resolve_field_name(&*field)?])),
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_logical_ast_with_occur(
|
||||
&self,
|
||||
user_input_ast: UserInputAST,
|
||||
@@ -265,6 +325,41 @@ impl QueryParser {
|
||||
let (occur, logical_sub_queries) = self.compute_logical_ast_with_occur(*subquery)?;
|
||||
Ok((compose_occur(Occur::Must, occur), logical_sub_queries))
|
||||
}
|
||||
UserInputAST::Range {
|
||||
field,
|
||||
lower,
|
||||
upper,
|
||||
} => {
|
||||
let fields = self.resolved_fields(&field)?;
|
||||
let mut clauses = fields
|
||||
.iter()
|
||||
.map(|&field| {
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let value_type = field_entry.field_type().value_type();
|
||||
Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range {
|
||||
field,
|
||||
value_type,
|
||||
lower: self.resolve_bound(field, &lower)?,
|
||||
upper: self.resolve_bound(field, &upper)?,
|
||||
})))
|
||||
})
|
||||
.collect::<Result<Vec<_>, QueryParserError>>()?;
|
||||
let result_ast = if clauses.len() == 1 {
|
||||
clauses.pop().unwrap()
|
||||
} else {
|
||||
LogicalAST::Clause(
|
||||
clauses
|
||||
.into_iter()
|
||||
.map(|clause| (Occur::Should, clause))
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
Ok((Occur::Should, result_ast))
|
||||
}
|
||||
UserInputAST::All => Ok((
|
||||
Occur::Should,
|
||||
LogicalAST::Leaf(Box::new(LogicalLiteral::All)),
|
||||
)),
|
||||
UserInputAST::Leaf(literal) => {
|
||||
let term_phrases: Vec<(Field, String)> = match literal.field_name {
|
||||
Some(ref field_name) => {
|
||||
@@ -327,6 +422,13 @@ fn convert_literal_to_query(logical_literal: LogicalLiteral) -> Box<Query> {
|
||||
match logical_literal {
|
||||
LogicalLiteral::Term(term) => Box::new(TermQuery::new(term, IndexRecordOption::WithFreqs)),
|
||||
LogicalLiteral::Phrase(terms) => Box::new(PhraseQuery::new(terms)),
|
||||
LogicalLiteral::Range {
|
||||
field,
|
||||
value_type,
|
||||
lower,
|
||||
upper,
|
||||
} => Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper)),
|
||||
LogicalLiteral::All => Box::new(AllQuery),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,6 +613,42 @@ mod test {
|
||||
Term([0, 0, 0, 0, 98])]\"",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"title:[a TO b]",
|
||||
"(Included(Term([0, 0, 0, 0, 97])) TO \
|
||||
Included(Term([0, 0, 0, 0, 98])))",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"[a TO b]",
|
||||
"((Included(Term([0, 0, 0, 0, 97])) TO \
|
||||
Included(Term([0, 0, 0, 0, 98]))) \
|
||||
(Included(Term([0, 0, 0, 1, 97])) TO \
|
||||
Included(Term([0, 0, 0, 1, 98]))))",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"title:{titi TO toto}",
|
||||
"(Excluded(Term([0, 0, 0, 0, 116, 105, 116, 105])) TO \
|
||||
Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"title:{* TO toto}",
|
||||
"(Unbounded TO \
|
||||
Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"title:{titi TO *}",
|
||||
"(Excluded(Term([0, 0, 0, 0, 116, 105, 116, 105])) TO Unbounded)",
|
||||
false,
|
||||
);
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"*",
|
||||
"*",
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -14,10 +14,44 @@ impl fmt::Debug for UserInputLiteral {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum UserInputBound {
|
||||
Inclusive(String),
|
||||
Exclusive(String),
|
||||
}
|
||||
|
||||
impl UserInputBound {
|
||||
fn display_lower(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
match *self {
|
||||
UserInputBound::Inclusive(ref word) => write!(formatter, "[\"{}\"", word),
|
||||
UserInputBound::Exclusive(ref word) => write!(formatter, "{{\"{}\"", word),
|
||||
}
|
||||
}
|
||||
|
||||
fn display_upper(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
match *self {
|
||||
UserInputBound::Inclusive(ref word) => write!(formatter, "\"{}\"]", word),
|
||||
UserInputBound::Exclusive(ref word) => write!(formatter, "\"{}\"}}", word),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn term_str(&self) -> &str {
|
||||
match *self {
|
||||
UserInputBound::Inclusive(ref contents) => contents,
|
||||
UserInputBound::Exclusive(ref contents) => contents,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum UserInputAST {
|
||||
Clause(Vec<Box<UserInputAST>>),
|
||||
Not(Box<UserInputAST>),
|
||||
Must(Box<UserInputAST>),
|
||||
Range {
|
||||
field: Option<String>,
|
||||
lower: UserInputBound,
|
||||
upper: UserInputBound,
|
||||
},
|
||||
All,
|
||||
Leaf(Box<UserInputLiteral>),
|
||||
}
|
||||
|
||||
@@ -45,6 +79,20 @@ impl fmt::Debug for UserInputAST {
|
||||
Ok(())
|
||||
}
|
||||
UserInputAST::Not(ref subquery) => write!(formatter, "-({:?})", subquery),
|
||||
UserInputAST::Range {
|
||||
ref field,
|
||||
ref lower,
|
||||
ref upper,
|
||||
} => {
|
||||
if let &Some(ref field) = field {
|
||||
write!(formatter, "{}:", field)?;
|
||||
}
|
||||
lower.display_lower(formatter)?;
|
||||
write!(formatter, " TO ")?;
|
||||
upper.display_upper(formatter)?;
|
||||
Ok(())
|
||||
}
|
||||
UserInputAST::All => write!(formatter, "*"),
|
||||
UserInputAST::Leaf(ref subquery) => write!(formatter, "{:?}", subquery),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,28 @@ pub struct RangeQuery {
|
||||
}
|
||||
|
||||
impl RangeQuery {
|
||||
/// Creates a new `RangeQuery` from bounded start and end terms.
|
||||
///
|
||||
/// If the value type is not correct, something may go terribly wrong when
|
||||
/// the `Weight` object is created.
|
||||
pub fn new_term_bounds(
|
||||
field: Field,
|
||||
value_type: Type,
|
||||
left_bound: Bound<Term>,
|
||||
right_bound: Bound<Term>,
|
||||
) -> RangeQuery {
|
||||
let verify_and_unwrap_term = |val: &Term| {
|
||||
assert_eq!(field, val.field());
|
||||
val.value_bytes().to_owned()
|
||||
};
|
||||
RangeQuery {
|
||||
field,
|
||||
value_type,
|
||||
left_bound: map_bound(&left_bound, &verify_and_unwrap_term),
|
||||
right_bound: map_bound(&right_bound, &verify_and_unwrap_term),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new `RangeQuery` over a `i64` field.
|
||||
///
|
||||
/// If the field is not of the type `i64`, tantivy
|
||||
|
||||
@@ -72,8 +72,7 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
|
||||
let mut skip_pointer = self.data_layer.insert(key, dest)?;
|
||||
loop {
|
||||
skip_pointer = match skip_pointer {
|
||||
Some((skip_doc_id, skip_offset)) => self
|
||||
.get_skip_layer(layer_id)
|
||||
Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id)
|
||||
.insert(skip_doc_id, &skip_offset)?,
|
||||
None => {
|
||||
return Ok(());
|
||||
|
||||
@@ -164,8 +164,7 @@ impl TermDictionary {
|
||||
let fst = self.fst_index.as_fst();
|
||||
let mut node = fst.root();
|
||||
while ord != 0 || !node.is_final() {
|
||||
if let Some(transition) = node
|
||||
.transitions()
|
||||
if let Some(transition) = node.transitions()
|
||||
.take_while(|transition| transition.out.value() <= ord)
|
||||
.last()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user