Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
79894657df Address #656
Broke the reference loop to make sure that the watch_router can
be dropped, and the thread exits.
2019-09-30 12:54:02 +09:00
9 changed files with 40 additions and 93 deletions

View File

@@ -3,6 +3,12 @@ Tantivy 0.11.0
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima) - Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
Tantivy 0.10.2
=====================
- Closes #656. Solving memory leak.
Tantivy 0.10.1 Tantivy 0.10.1
===================== =====================

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy" name = "tantivy"
version = "0.10.1" version = "0.10.2"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]
@@ -98,4 +98,4 @@ features = ["failpoints"]
[[test]] [[test]]
name = "failpoints" name = "failpoints"
path = "tests/failpoints/mod.rs" path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"] required-features = ["fail/failpoints"]

View File

@@ -13,7 +13,6 @@ use crate::Result;
use crate::Score; use crate::Score;
use crate::SegmentLocalId; use crate::SegmentLocalId;
use crate::SegmentReader; use crate::SegmentReader;
use std::fmt;
/// The Top Score Collector keeps track of the K documents /// The Top Score Collector keeps track of the K documents
/// sorted by their score. /// sorted by their score.
@@ -69,12 +68,6 @@ use std::fmt;
/// ``` /// ```
pub struct TopDocs(TopCollector<Score>); pub struct TopDocs(TopCollector<Score>);
impl fmt::Debug for TopDocs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TopDocs({})", self.0.limit())
}
}
impl TopDocs { impl TopDocs {
/// Creates a top score collector, with a number of documents equal to "limit". /// Creates a top score collector, with a number of documents equal to "limit".
/// ///

View File

@@ -141,42 +141,28 @@ impl MmapCache {
} }
} }
struct InnerWatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: WatchCallbackList,
}
impl InnerWatcherWrapper {
pub fn new(path: &Path) -> Result<(Self, Receiver<notify::RawEvent>), notify::Error> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let mut watcher = notify::raw_watcher(tx)?;
watcher.watch(path, RecursiveMode::Recursive)?;
let inner = InnerWatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router: Default::default(),
};
Ok((inner, watcher_recv))
}
}
#[derive(Clone)]
struct WatcherWrapper { struct WatcherWrapper {
inner: Arc<InnerWatcherWrapper>, _watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
} }
impl WatcherWrapper { impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> { pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err { let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()), // We need to initialize the
_ => { let watcher = notify::raw_watcher(tx)
panic!("Unknown error while starting watching directory {:?}", path); .and_then(|mut watcher| {
} watcher.watch(path, RecursiveMode::Recursive)?;
})?; Ok(watcher)
let watcher_wrapper = WatcherWrapper { })
inner: Arc::new(inner), .map_err(|err| match err {
}; notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
let watcher_wrapper_clone = watcher_wrapper.clone(); _ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new() thread::Builder::new()
.name("meta-file-watch-thread".to_string()) .name("meta-file-watch-thread".to_string())
.spawn(move || { .spawn(move || {
@@ -187,7 +173,7 @@ impl WatcherWrapper {
// We might want to be more accurate than this at one point. // We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() { if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH { if filename == *META_FILEPATH {
watcher_wrapper_clone.inner.watcher_router.broadcast(); watcher_router_clone.broadcast();
} }
} }
} }
@@ -200,13 +186,15 @@ impl WatcherWrapper {
} }
} }
} }
}) })?;
.expect("Failed to spawn thread to watch meta.json"); Ok(WatcherWrapper {
Ok(watcher_wrapper) _watcher: Mutex::new(watcher),
watcher_router,
})
} }
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle { pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.inner.watcher_router.subscribe(watch_callback) self.watcher_router.subscribe(watch_callback)
} }
} }

View File

@@ -171,16 +171,16 @@ pub use self::snippet::{Snippet, SnippetGenerator};
mod docset; mod docset;
pub use self::docset::{DocSet, SkipResult}; pub use self::docset::{DocSet, SkipResult};
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::SegmentComponent; pub use crate::core::SegmentComponent;
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta}; pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader}; pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory; pub use crate::directory::Directory;
pub use crate::indexer::IndexWriter; pub use crate::indexer::IndexWriter;
pub use crate::postings::Postings; pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term}; pub use crate::schema::{Document, Term};
pub use crate::common::{i64_to_u64, u64_to_i64, f64_to_u64, u64_to_f64};
/// Expose the current version of tantivy, as well /// Expose the current version of tantivy, as well
/// whether it was compiled with the simd compression. /// whether it was compiled with the simd compression.
pub fn version() -> &'static str { pub fn version() -> &'static str {
@@ -849,8 +849,7 @@ mod tests {
let index = Index::create_in_ram(schema); let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 50_000_000).unwrap(); let mut index_writer = index.writer_with_num_threads(1, 50_000_000).unwrap();
{ {
let document = let document = doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64, fast_field_float=>4f64);
doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64, fast_field_float=>4f64);
index_writer.add_document(document); index_writer.add_document(document);
index_writer.commit().unwrap(); index_writer.commit().unwrap();
} }

View File

@@ -18,56 +18,42 @@ use crate::schema::{FieldType, Term};
use crate::tokenizer::TokenizerManager; use crate::tokenizer::TokenizerManager;
use combine::Parser; use combine::Parser;
use std::borrow::Cow; use std::borrow::Cow;
use std::num::{ParseFloatError, ParseIntError}; use std::num::{ParseIntError, ParseFloatError};
use std::ops::Bound; use std::ops::Bound;
use std::str::FromStr; use std::str::FromStr;
/// Possible error that may happen when parsing a query. /// Possible error that may happen when parsing a query.
#[derive(Debug, PartialEq, Eq, Fail)] #[derive(Debug, PartialEq, Eq)]
pub enum QueryParserError { pub enum QueryParserError {
/// Error in the query syntax /// Error in the query syntax
#[fail(display = "Syntax Error")]
SyntaxError, SyntaxError,
/// `FieldDoesNotExist(field_name: String)` /// `FieldDoesNotExist(field_name: String)`
/// The query references a field that is not in the schema /// The query references a field that is not in the schema
#[fail(display = "File does not exists: '{:?}'", _0)]
FieldDoesNotExist(String), FieldDoesNotExist(String),
/// The query contains a term for a `u64` or `i64`-field, but the value /// The query contains a term for a `u64` or `i64`-field, but the value
/// is neither. /// is neither.
#[fail(display = "Expected a valid integer: '{:?}'", _0)]
ExpectedInt(ParseIntError), ExpectedInt(ParseIntError),
/// The query contains a term for a `f64`-field, but the value /// The query contains a term for a `f64`-field, but the value
/// is not a f64. /// is not a f64.
#[fail(display = "Invalid query: Only excluding terms given")]
ExpectedFloat(ParseFloatError), ExpectedFloat(ParseFloatError),
/// It is forbidden queries that are only "excluding". (e.g. -title:pop) /// It is forbidden queries that are only "excluding". (e.g. -title:pop)
#[fail(display = "Invalid query: Only excluding terms given")]
AllButQueryForbidden, AllButQueryForbidden,
/// If no default field is declared, running a query without any /// If no default field is declared, running a query without any
/// field specified is forbbidden. /// field specified is forbbidden.
#[fail(display = "No default field declared and no field specified in query")]
NoDefaultFieldDeclared, NoDefaultFieldDeclared,
/// The field searched for is not declared /// The field searched for is not declared
/// as indexed in the schema. /// as indexed in the schema.
#[fail(display = "The field '{:?}' is not declared as indexed", _0)]
FieldNotIndexed(String), FieldNotIndexed(String),
/// A phrase query was requested for a field that does not /// A phrase query was requested for a field that does not
/// have any positions indexed. /// have any positions indexed.
#[fail(display = "The field '{:?}' does not have positions indexed", _0)]
FieldDoesNotHavePositionsIndexed(String), FieldDoesNotHavePositionsIndexed(String),
/// The tokenizer for the given field is unknown /// The tokenizer for the given field is unknown
/// The two argument strings are the name of the field, the name of the tokenizer /// The two argument strings are the name of the field, the name of the tokenizer
#[fail(
display = "The tokenizer '{:?}' for the field '{:?}' is unknown",
_0, _1
)]
UnknownTokenizer(String, String), UnknownTokenizer(String, String),
/// The query contains a range query with a phrase as one of the bounds. /// The query contains a range query with a phrase as one of the bounds.
/// Only terms can be used as bounds. /// Only terms can be used as bounds.
#[fail(display = "A range query cannot have a phrase as one of the bounds")]
RangeMustNotHavePhrase, RangeMustNotHavePhrase,
/// The format for the date field is not RFC 3339 compliant. /// The format for the date field is not RFC 3339 compliant.
#[fail(display = "The date field has an invalid format")]
DateFormatError(chrono::ParseError), DateFormatError(chrono::ParseError),
} }

View File

@@ -1,7 +1,6 @@
mod pool; mod pool;
pub use self::pool::LeasedItem; use self::pool::{LeasedItem, Pool};
use self::pool::Pool;
use crate::core::Segment; use crate::core::Segment;
use crate::directory::Directory; use crate::directory::Directory;
use crate::directory::WatchHandle; use crate::directory::WatchHandle;

View File

@@ -123,10 +123,6 @@ impl<T> Pool<T> {
} }
} }
/// A LeasedItem holds an object borrowed from a Pool.
///
/// Upon drop, the object is automatically returned
/// into the pool.
pub struct LeasedItem<T> { pub struct LeasedItem<T> {
gen_item: Option<GenerationItem<T>>, gen_item: Option<GenerationItem<T>>,
recycle_queue: Arc<Queue<GenerationItem<T>>>, recycle_queue: Arc<Queue<GenerationItem<T>>>,

View File

@@ -261,24 +261,6 @@ impl Schema {
NamedFieldDocument(field_map) NamedFieldDocument(field_map)
} }
/// Converts a named doc into a document.
pub fn from_named_doc(
&self,
named_doc: NamedFieldDocument,
) -> Result<Document, DocParsingError> {
let mut doc = Document::default();
for (field_name, field_values) in named_doc.0 {
if let Some(field) = self.get_field(&field_name) {
for field_value in field_values {
doc.add(FieldValue::new(field, field_value));
}
} else {
return Err(DocParsingError::NoSuchFieldInSchema(field_name.clone()));
}
}
Ok(doc)
}
/// Encode the schema in JSON. /// Encode the schema in JSON.
/// ///
/// Encoding a document cannot fail. /// Encoding a document cannot fail.
@@ -297,6 +279,7 @@ impl Schema {
}; };
DocParsingError::NotJSON(doc_json_sample) DocParsingError::NotJSON(doc_json_sample)
})?; })?;
let mut doc = Document::default(); let mut doc = Document::default();
for (field_name, json_value) in json_obj.iter() { for (field_name, json_value) in json_obj.iter() {
match self.get_field(field_name) { match self.get_field(field_name) {
@@ -377,16 +360,13 @@ impl<'de> Deserialize<'de> for Schema {
/// Error that may happen when deserializing /// Error that may happen when deserializing
/// a document from JSON. /// a document from JSON.
#[derive(Debug, Fail)] #[derive(Debug)]
pub enum DocParsingError { pub enum DocParsingError {
/// The payload given is not valid JSON. /// The payload given is not valid JSON.
#[fail(display = "The provided string is not valid JSON")]
NotJSON(String), NotJSON(String),
/// One of the value node could not be parsed. /// One of the value node could not be parsed.
#[fail(display = "The field '{:?}' could not be parsed: {:?}", _0, _1)]
ValueError(String, ValueParsingError), ValueError(String, ValueParsingError),
/// The json-document contains a field that is not declared in the schema. /// The json-document contains a field that is not declared in the schema.
#[fail(display = "The json-document contains an unknown field: {:?}", _0)]
NoSuchFieldInSchema(String), NoSuchFieldInSchema(String),
} }