Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
79894657df Address #656
Broke the reference loop to make sure that the watch_router can
be dropped, and the thread exits.
2019-09-30 12:54:02 +09:00
9 changed files with 40 additions and 93 deletions

View File

@@ -3,6 +3,12 @@ Tantivy 0.11.0
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
Tantivy 0.10.2
=====================
- Closes #656. Solving memory leak.
Tantivy 0.10.1
=====================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.10.1"
version = "0.10.2"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -98,4 +98,4 @@ features = ["failpoints"]
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
required-features = ["fail/failpoints"]

View File

@@ -13,7 +13,6 @@ use crate::Result;
use crate::Score;
use crate::SegmentLocalId;
use crate::SegmentReader;
use std::fmt;
/// The Top Score Collector keeps track of the K documents
/// sorted by their score.
@@ -69,12 +68,6 @@ use std::fmt;
/// ```
pub struct TopDocs(TopCollector<Score>);
impl fmt::Debug for TopDocs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TopDocs({})", self.0.limit())
}
}
impl TopDocs {
/// Creates a top score collector, with a number of documents equal to "limit".
///

View File

@@ -141,42 +141,28 @@ impl MmapCache {
}
}
struct InnerWatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: WatchCallbackList,
}
impl InnerWatcherWrapper {
pub fn new(path: &Path) -> Result<(Self, Receiver<notify::RawEvent>), notify::Error> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let mut watcher = notify::raw_watcher(tx)?;
watcher.watch(path, RecursiveMode::Recursive)?;
let inner = InnerWatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router: Default::default(),
};
Ok((inner, watcher_recv))
}
}
#[derive(Clone)]
struct WatcherWrapper {
inner: Arc<InnerWatcherWrapper>,
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_wrapper = WatcherWrapper {
inner: Arc::new(inner),
};
let watcher_wrapper_clone = watcher_wrapper.clone();
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
@@ -187,7 +173,7 @@ impl WatcherWrapper {
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
watcher_wrapper_clone.inner.watcher_router.broadcast();
watcher_router_clone.broadcast();
}
}
}
@@ -200,13 +186,15 @@ impl WatcherWrapper {
}
}
}
})
.expect("Failed to spawn thread to watch meta.json");
Ok(watcher_wrapper)
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.inner.watcher_router.subscribe(watch_callback)
self.watcher_router.subscribe(watch_callback)
}
}

View File

@@ -171,16 +171,16 @@ pub use self::snippet::{Snippet, SnippetGenerator};
mod docset;
pub use self::docset::{DocSet, SkipResult};
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::SegmentComponent;
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
pub use crate::indexer::IndexWriter;
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term};
pub use crate::common::{i64_to_u64, u64_to_i64, f64_to_u64, u64_to_f64};
/// Expose the current version of tantivy, as well
/// whether it was compiled with the simd compression.
pub fn version() -> &'static str {
@@ -849,8 +849,7 @@ mod tests {
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 50_000_000).unwrap();
{
let document =
doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64, fast_field_float=>4f64);
let document = doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64, fast_field_float=>4f64);
index_writer.add_document(document);
index_writer.commit().unwrap();
}

View File

@@ -18,56 +18,42 @@ use crate::schema::{FieldType, Term};
use crate::tokenizer::TokenizerManager;
use combine::Parser;
use std::borrow::Cow;
use std::num::{ParseFloatError, ParseIntError};
use std::num::{ParseIntError, ParseFloatError};
use std::ops::Bound;
use std::str::FromStr;
/// Possible error that may happen when parsing a query.
#[derive(Debug, PartialEq, Eq, Fail)]
#[derive(Debug, PartialEq, Eq)]
pub enum QueryParserError {
/// Error in the query syntax
#[fail(display = "Syntax Error")]
SyntaxError,
/// `FieldDoesNotExist(field_name: String)`
/// The query references a field that is not in the schema
#[fail(display = "File does not exists: '{:?}'", _0)]
FieldDoesNotExist(String),
/// The query contains a term for a `u64` or `i64`-field, but the value
/// is neither.
#[fail(display = "Expected a valid integer: '{:?}'", _0)]
ExpectedInt(ParseIntError),
/// The query contains a term for a `f64`-field, but the value
/// is not a f64.
#[fail(display = "Invalid query: Only excluding terms given")]
ExpectedFloat(ParseFloatError),
/// It is forbidden queries that are only "excluding". (e.g. -title:pop)
#[fail(display = "Invalid query: Only excluding terms given")]
AllButQueryForbidden,
/// If no default field is declared, running a query without any
/// field specified is forbbidden.
#[fail(display = "No default field declared and no field specified in query")]
NoDefaultFieldDeclared,
/// The field searched for is not declared
/// as indexed in the schema.
#[fail(display = "The field '{:?}' is not declared as indexed", _0)]
FieldNotIndexed(String),
/// A phrase query was requested for a field that does not
/// have any positions indexed.
#[fail(display = "The field '{:?}' does not have positions indexed", _0)]
FieldDoesNotHavePositionsIndexed(String),
/// The tokenizer for the given field is unknown
/// The two argument strings are the name of the field, the name of the tokenizer
#[fail(
display = "The tokenizer '{:?}' for the field '{:?}' is unknown",
_0, _1
)]
UnknownTokenizer(String, String),
/// The query contains a range query with a phrase as one of the bounds.
/// Only terms can be used as bounds.
#[fail(display = "A range query cannot have a phrase as one of the bounds")]
RangeMustNotHavePhrase,
/// The format for the date field is not RFC 3339 compliant.
#[fail(display = "The date field has an invalid format")]
DateFormatError(chrono::ParseError),
}

View File

@@ -1,7 +1,6 @@
mod pool;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use self::pool::{LeasedItem, Pool};
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;

View File

@@ -123,10 +123,6 @@ impl<T> Pool<T> {
}
}
/// A LeasedItem holds an object borrowed from a Pool.
///
/// Upon drop, the object is automatically returned
/// into the pool.
pub struct LeasedItem<T> {
gen_item: Option<GenerationItem<T>>,
recycle_queue: Arc<Queue<GenerationItem<T>>>,

View File

@@ -261,24 +261,6 @@ impl Schema {
NamedFieldDocument(field_map)
}
/// Converts a named doc into a document.
pub fn from_named_doc(
&self,
named_doc: NamedFieldDocument,
) -> Result<Document, DocParsingError> {
let mut doc = Document::default();
for (field_name, field_values) in named_doc.0 {
if let Some(field) = self.get_field(&field_name) {
for field_value in field_values {
doc.add(FieldValue::new(field, field_value));
}
} else {
return Err(DocParsingError::NoSuchFieldInSchema(field_name.clone()));
}
}
Ok(doc)
}
/// Encode the schema in JSON.
///
/// Encoding a document cannot fail.
@@ -297,6 +279,7 @@ impl Schema {
};
DocParsingError::NotJSON(doc_json_sample)
})?;
let mut doc = Document::default();
for (field_name, json_value) in json_obj.iter() {
match self.get_field(field_name) {
@@ -377,16 +360,13 @@ impl<'de> Deserialize<'de> for Schema {
/// Error that may happen when deserializing
/// a document from JSON.
#[derive(Debug, Fail)]
#[derive(Debug)]
pub enum DocParsingError {
/// The payload given is not valid JSON.
#[fail(display = "The provided string is not valid JSON")]
NotJSON(String),
/// One of the value node could not be parsed.
#[fail(display = "The field '{:?}' could not be parsed: {:?}", _0, _1)]
ValueError(String, ValueParsingError),
/// The json-document contains a field that is not declared in the schema.
#[fail(display = "The json-document contains an unknown field: {:?}", _0)]
NoSuchFieldInSchema(String),
}