Compare commits

..

6 Commits

Author SHA1 Message Date
Paul Masurel
d7973892a2 extra commit 2019-12-27 22:53:04 +09:00
Paul Masurel
cd7484c035 Added ReadOnlyDirectory and implemented Bundle Directory 2019-12-27 12:05:39 +09:00
Paul Masurel
7ed6bc8718 Added serialize to bundle in the RAMDirectory. 2019-12-26 10:06:52 +09:00
Paul Masurel
d12a06b65b Tiny code simplification. 2019-12-26 09:33:17 +09:00
Minoru Osuka
749432f949 Make SchemaBuilder::add_field() public (#742)
* Make add_field() to public

* cargo format
2019-12-25 20:37:34 +09:00
Paul Masurel
c1400f25a7 Handle facet search in the QueryParser. (#741)
Closes #738
2019-12-25 17:43:33 +09:00
25 changed files with 611 additions and 208 deletions

View File

@@ -13,63 +13,100 @@
// ---
// Importing tantivy...
use tantivy::collector::FacetCollector;
use tantivy::query::AllQuery;
use tantivy::query::{AllQuery, TermQuery};
use tantivy::schema::*;
use tantivy::{doc, Index};
use tempfile::TempDir;
fn main() -> tantivy::Result<()> {
// Let's create a temporary directory for the
// sake of this example
let index_path = TempDir::new()?;
// Let's create a temporary directory for the sake of this example
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("name", TEXT | STORED);
// this is our faceted field
schema_builder.add_facet_field("tags");
let name = schema_builder.add_text_field("felin_name", TEXT | STORED);
// this is our faceted field: its scientific classification
let classification = schema_builder.add_facet_field("classification");
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index = Index::create_in_dir(&index_path, schema.clone())?;
let mut index_writer = index.writer(50_000_000)?;
let name = schema.get_field("name").unwrap();
let tags = schema.get_field("tags").unwrap();
let mut index_writer = index.writer(30_000_000)?;
// For convenience, tantivy also comes with a macro to
// reduce the boilerplate above.
index_writer.add_document(doc!(
name => "the ditch",
tags => Facet::from("/pools/north")
name => "Cat",
classification => Facet::from("/Felidae/Felinae/Felis")
));
index_writer.add_document(doc!(
name => "little stacey",
tags => Facet::from("/pools/south")
name => "Canada lynx",
classification => Facet::from("/Felidae/Felinae/Lynx")
));
index_writer.add_document(doc!(
name => "Cheetah",
classification => Facet::from("/Felidae/Felinae/Acinonyx")
));
index_writer.add_document(doc!(
name => "Tiger",
classification => Facet::from("/Felidae/Pantherinae/Panthera")
));
index_writer.add_document(doc!(
name => "Lion",
classification => Facet::from("/Felidae/Pantherinae/Panthera")
));
index_writer.add_document(doc!(
name => "Jaguar",
classification => Facet::from("/Felidae/Pantherinae/Panthera")
));
index_writer.add_document(doc!(
name => "Sunda clouded leopard",
classification => Facet::from("/Felidae/Pantherinae/Neofelis")
));
index_writer.add_document(doc!(
name => "Fossa",
classification => Facet::from("/Eupleridae/Cryptoprocta")
));
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
{
let mut facet_collector = FacetCollector::for_field(classification);
facet_collector.add_facet("/Felidae");
let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
// This lists all of the facet counts, right below "/Felidae".
let facets: Vec<(&Facet, u64)> = facet_counts.get("/Felidae").collect();
assert_eq!(
facets,
vec![
(&Facet::from("/Felidae/Felinae"), 3),
(&Facet::from("/Felidae/Pantherinae"), 4),
]
);
}
let mut facet_collector = FacetCollector::for_field(tags);
facet_collector.add_facet("/pools");
// Facets are also searchable.
//
// For instance a common UI pattern is to allow the user someone to click on a facet link
// (e.g: `Pantherinae`) to drill down and filter the current result set with this subfacet.
//
// The search would then look as follows.
let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap();
// This lists all of the facet counts
let facets: Vec<(&Facet, u64)> = facet_counts.get("/pools").collect();
assert_eq!(
facets,
vec![
(&Facet::from("/pools/north"), 1),
(&Facet::from("/pools/south"), 1),
]
);
// Check the reference doc for different ways to create a `Facet` object.
{
let facet = Facet::from_text("/Felidae/Pantherinae");
let facet_term = Term::from_facet(classification, &facet);
let facet_term_query = TermQuery::new(facet_term, IndexRecordOption::Basic);
let mut facet_collector = FacetCollector::for_field(classification);
facet_collector.add_facet("/Felidae/Pantherinae");
let facet_counts = searcher.search(&facet_term_query, &facet_collector)?;
let facets: Vec<(&Facet, u64)> = facet_counts.get("/Felidae/Pantherinae").collect();
assert_eq!(
facets,
vec![
(&Facet::from("/Felidae/Pantherinae/Neofelis"), 1),
(&Facet::from("/Felidae/Pantherinae/Panthera"), 3),
]
);
}
Ok(())
}

View File

@@ -452,9 +452,11 @@ impl FacetCounts {
#[cfg(test)]
mod tests {
use super::{FacetCollector, FacetCounts};
use crate::collector::Count;
use crate::core::Index;
use crate::query::AllQuery;
use crate::schema::{Document, Facet, Field, Schema};
use crate::query::{AllQuery, QueryParser, TermQuery};
use crate::schema::{Document, Facet, Field, IndexRecordOption, Schema};
use crate::Term;
use rand::distributions::Uniform;
use rand::prelude::SliceRandom;
use rand::{thread_rng, Rng};
@@ -544,6 +546,56 @@ mod tests {
assert_eq!(facets[0].1, 1);
}
#[test]
fn test_doc_search_by_facet() {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facet");
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
facet_field => Facet::from_text(&"/A/A"),
));
index_writer.add_document(doc!(
facet_field => Facet::from_text(&"/A/B"),
));
index_writer.add_document(doc!(
facet_field => Facet::from_text(&"/A/C/A"),
));
index_writer.add_document(doc!(
facet_field => Facet::from_text(&"/D/C/A"),
));
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 4);
let count_facet = |facet_str: &str| {
let term = Term::from_facet(facet_field, &Facet::from_text(facet_str));
searcher
.search(&TermQuery::new(term, IndexRecordOption::Basic), &Count)
.unwrap()
};
assert_eq!(count_facet("/"), 4);
assert_eq!(count_facet("/A"), 3);
assert_eq!(count_facet("/A/B"), 1);
assert_eq!(count_facet("/A/C"), 1);
assert_eq!(count_facet("/A/C/A"), 1);
assert_eq!(count_facet("/C/A"), 0);
{
let query_parser = QueryParser::for_index(&index, vec![]);
{
let query = query_parser.parse_query("facet:/A/B").unwrap();
assert_eq!(1, searcher.search(&query, &Count).unwrap());
}
{
let query = query_parser.parse_query("facet:/A").unwrap();
assert_eq!(3, searcher.search(&query, &Count).unwrap());
}
}
}
#[test]
fn test_non_used_facet_collector() {
let mut facet_collector = FacetCollector::for_field(Field::from_field_id(0));

View File

@@ -186,7 +186,7 @@ mod test {
use super::{CompositeFile, CompositeWrite};
use crate::common::BinarySerializable;
use crate::common::VInt;
use crate::directory::{Directory, RAMDirectory};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory};
use crate::schema::Field;
use std::io::Write;
use std::path::Path;

View File

@@ -338,7 +338,7 @@ impl Index {
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
let segment_meta = self
let mut segment_meta = self
.inventory
.new_segment_meta(SegmentId::generate_random(), 0);
self.segment(segment_meta)

View File

@@ -35,6 +35,7 @@ impl SegmentMetaInventory {
segment_id,
max_doc,
deletes: None,
bundled: false,
};
SegmentMeta::from(self.inventory.track(inner))
}
@@ -81,6 +82,19 @@ impl SegmentMeta {
self.tracked.segment_id
}
pub fn with_bundled(self) -> SegmentMeta {
SegmentMeta::from(self.tracked.map(|inner| InnerSegmentMeta {
segment_id: inner.segment_id,
max_doc: inner.max_doc,
deletes: inner.deletes.clone(),
bundled: true,
}))
}
pub fn is_bundled(&self) -> bool {
self.tracked.bundled
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
@@ -107,8 +121,12 @@ impl SegmentMeta {
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
let mut path = self.id().uuid_string();
path.push_str(&*match component {
let suffix = self.suffix(component);
self.relative_path_from_suffix(&suffix)
}
fn suffix(&self, component: SegmentComponent) -> String {
match component {
SegmentComponent::POSTINGS => ".idx".to_string(),
SegmentComponent::POSITIONS => ".pos".to_string(),
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
@@ -117,7 +135,17 @@ impl SegmentMeta {
SegmentComponent::FASTFIELDS => ".fast".to_string(),
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
});
}
}
/// Returns the relative path of a component of our segment.
///
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path_from_suffix(&self, suffix: &str) -> PathBuf {
let mut path = self.id().uuid_string();
path.push_str(".");
path.push_str(&suffix);
PathBuf::from(path)
}
@@ -161,6 +189,7 @@ impl SegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
bundled: inner_meta.bundled,
});
SegmentMeta { tracked }
}
@@ -175,6 +204,7 @@ impl SegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
bundled: inner_meta.bundled,
});
SegmentMeta { tracked }
}
@@ -185,6 +215,7 @@ struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
bundled: bool,
}
impl InnerSegmentMeta {

View File

@@ -4,14 +4,12 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::directory::error::{OpenReadError, OpenWriteError};
use crate::directory::Directory;
use crate::directory::{ReadOnlySource, WritePtr};
use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::schema::Schema;
use crate::Opstamp;
use crate::Result;
use std::fmt;
use std::path::PathBuf;
use std::result;
/// A segment is a piece of the index.
#[derive(Clone)]
@@ -83,23 +81,30 @@ impl Segment {
}
/// Open one of the component file for a *regular* read.
pub fn open_read(
&self,
component: SegmentComponent,
) -> result::Result<ReadOnlySource, OpenReadError> {
pub fn open_read(&self, component: SegmentComponent) -> Result<ReadOnlySource, OpenReadError> {
let path = self.relative_path(component);
let source = self.index.directory().open_read(&path)?;
Ok(source)
}
/// Open one of the component file for *regular* write.
pub fn open_write(
pub fn open_write(&mut self, component: SegmentComponent) -> Result<WritePtr, OpenWriteError> {
let path = self.relative_path(component);
self.index.directory_mut().open_write(&path)
}
pub fn open_bundle_writer(&mut self) -> Result<WritePtr, OpenWriteError> {
let path = self.meta.relative_path_from_suffix("bundle");
self.index.directory_mut().open_write(&path)
}
pub(crate) fn open_write_in_directory(
&mut self,
component: SegmentComponent,
) -> result::Result<WritePtr, OpenWriteError> {
directory: &mut dyn Directory,
) -> Result<WritePtr, OpenWriteError> {
let path = self.relative_path(component);
let write = self.index.directory_mut().open_write(&path)?;
Ok(write)
directory.open_write(&path)
}
}
@@ -109,5 +114,5 @@ pub trait SerializableSegment {
///
/// # Returns
/// The number of documents in the segment.
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32>;
}

View File

@@ -0,0 +1,97 @@
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::OpenReadError;
use crate::directory::ReadOnlySource;
use crate::error::DataCorruption;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Clone)]
struct BundleDirectory {
source_map: Arc<HashMap<PathBuf, ReadOnlySource>>,
}
impl BundleDirectory {
pub fn from_source(source: ReadOnlySource) -> Result<BundleDirectory, DataCorruption> {
let mut index_offset_buf = [0u8; 8];
let (body_idx, footer_offset) = source.split_from_end(8);
index_offset_buf.copy_from_slice(footer_offset.as_slice());
let offset = u64::from_le_bytes(index_offset_buf);
let (body_source, idx_source) = body_idx.split(offset as usize);
let idx: HashMap<PathBuf, (u64, u64)> = serde_json::from_slice(idx_source.as_slice())
.map_err(|err| {
let msg = format!("Failed to read index from bundle. {:?}", err);
DataCorruption::comment_only(msg)
})?;
let source_map: HashMap<PathBuf, ReadOnlySource> = idx
.into_iter()
.map(|(path, (start, stop))| {
let source = body_source.slice(start as usize, stop as usize);
(path, source)
})
.collect();
Ok(BundleDirectory {
source_map: Arc::new(source_map),
})
}
}
impl ReadOnlyDirectory for BundleDirectory {
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
self.source_map
.get(path)
.cloned()
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))
}
fn exists(&self, path: &Path) -> bool {
self.source_map.contains_key(path)
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let source = self
.source_map
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
Ok(source.as_slice().to_vec())
}
}
#[cfg(test)]
mod tests {
use super::BundleDirectory;
use crate::directory::{RAMDirectory, ReadOnlyDirectory, TerminatingWrite};
use crate::Directory;
use std::io::Write;
use std::path::Path;
#[test]
fn test_bundle_directory() {
let mut ram_directory = RAMDirectory::default();
let test_path_atomic = Path::new("testpath_atomic");
let test_path_wrt = Path::new("testpath_wrt");
assert!(ram_directory
.atomic_write(test_path_atomic, b"titi")
.is_ok());
{
let mut test_wrt = ram_directory.open_write(test_path_wrt).unwrap();
assert!(test_wrt.write_all(b"toto").is_ok());
assert!(test_wrt.terminate().is_ok());
}
let mut dest_directory = RAMDirectory::default();
let bundle_path = Path::new("bundle");
let mut wrt = dest_directory.open_write(bundle_path).unwrap();
assert!(ram_directory.serialize_bundle(&mut wrt).is_ok());
assert!(wrt.terminate().is_ok());
let source = dest_directory.open_read(bundle_path).unwrap();
let bundle_directory = BundleDirectory::from_source(source).unwrap();
assert_eq!(
&bundle_directory.atomic_read(test_path_atomic).unwrap()[..],
b"titi"
);
assert_eq!(
&bundle_directory.open_read(test_path_wrt).unwrap()[..],
b"toto"
);
}
}

View File

@@ -100,17 +100,7 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
}
}
/// Write-once read many (WORM) abstraction for where
/// tantivy's data should be stored.
///
/// There are currently two implementations of `Directory`
///
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
pub trait ReadOnlyDirectory {
/// Opens a virtual file for read.
///
/// Once a virtual file is open, its data may not
@@ -122,6 +112,31 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;
/// Reads the full content file that has been written using
/// atomic_write.
///
/// This should only be used for small files.
///
/// You should only use this to read files create with [Directory::atomic_write].
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
}
/// Write-once read many (WORM) abstraction for where
/// tantivy's data should be stored.
///
/// There are currently two implementations of `Directory`
///
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory:
DirectoryClone + ReadOnlyDirectory + fmt::Debug + Send + Sync + 'static
{
/// Removes a file
///
/// Removing a file will not affect an eventual
@@ -131,9 +146,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `DeleteError::DoesNotExist`.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;
/// Opens a writer for the *virtual file* associated with
/// a Path.
///
@@ -155,14 +167,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// The file may not previously exist.
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError>;
/// Reads the full content file that has been written using
/// atomic_write.
///
/// This should only be used for small files.
///
/// You should only use this to read files create with [Directory::atomic_write].
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
/// Atomically replace the content of a file with data.
///
/// This calls ensure that reads can never *observe*

View File

@@ -10,6 +10,7 @@ use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
use crate::Directory;
use crate::directory::directory::ReadOnlyDirectory;
use crc32fast::Hasher;
use serde_json;
use std::collections::HashSet;
@@ -264,14 +265,6 @@ impl ManagedDirectory {
}
impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source)
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
footer.is_compatible()?;
Ok(reader)
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.register_file_as_managed(path)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
@@ -289,18 +282,10 @@ impl Directory for ManagedDirectory {
self.directory.atomic_write(path, data)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
self.directory.atomic_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
self.directory.acquire_lock(lock)
}
@@ -310,6 +295,24 @@ impl Directory for ManagedDirectory {
}
}
impl ReadOnlyDirectory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source)
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
footer.is_compatible()?;
Ok(reader)
}
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
self.directory.atomic_read(path)
}
}
impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
@@ -323,7 +326,9 @@ impl Clone for ManagedDirectory {
#[cfg(test)]
mod tests_mmap_specific {
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
use crate::directory::{
Directory, ManagedDirectory, MmapDirectory, ReadOnlyDirectory, TerminatingWrite,
};
use std::collections::HashSet;
use std::fs::OpenOptions;
use std::io::Write;

View File

@@ -6,6 +6,7 @@ use self::notify::RawEvent;
use self::notify::RecursiveMode;
use self::notify::Watcher;
use crate::core::META_FILEPATH;
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::LockError;
use crate::directory::error::{
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
@@ -131,14 +132,13 @@ impl MmapCache {
}
self.cache.remove(full_path);
self.counters.miss += 1;
Ok(if let Some(mmap) = open_mmap(full_path)? {
let mmap_opt = open_mmap(full_path)?;
Ok(mmap_opt.map(|mmap| {
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
let mmap_weak = Arc::downgrade(&mmap_arc);
self.cache.insert(full_path.to_owned(), mmap_weak);
Some(mmap_arc)
} else {
None
})
mmap_arc
}))
}
}
@@ -408,24 +408,6 @@ impl TerminatingWrite for SafeFileWriter {
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::from)
.unwrap_or_else(ReadOnlySource::empty))
}
/// Any entry associated to the path in the mmap will be
/// removed before the file is deleted.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
@@ -444,11 +426,6 @@ impl Directory for MmapDirectory {
}
}
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
full_path.exists()
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
@@ -479,25 +456,6 @@ impl Directory for MmapDirectory {
Ok(BufWriter::new(Box::new(writer)))
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let full_path = self.resolve_path(path);
let mut buffer = Vec::new();
match File::open(&full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(buffer)
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let full_path = self.resolve_path(path);
@@ -531,6 +489,50 @@ impl Directory for MmapDirectory {
}
}
impl ReadOnlyDirectory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::from)
.unwrap_or_else(ReadOnlySource::empty))
}
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
full_path.exists()
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let full_path = self.resolve_path(path);
let mut buffer = Vec::new();
match File::open(&full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(buffer)
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}
}
}
#[cfg(test)]
mod tests {

View File

@@ -7,6 +7,7 @@ WORM directory abstraction.
#[cfg(feature = "mmap")]
mod mmap_directory;
mod bundle_directory;
mod directory;
mod directory_lock;
mod footer;
@@ -19,7 +20,7 @@ mod watch_event_router;
pub mod error;
pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::directory::{Directory, DirectoryClone, ReadOnlyDirectory};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;

View File

@@ -1,4 +1,6 @@
use crate::common::CountingWriter;
use crate::core::META_FILEPATH;
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::AntiCallToken;
use crate::directory::WatchCallbackList;
@@ -115,6 +117,22 @@ impl InnerDirectory {
fn total_mem_usage(&self) -> usize {
self.fs.values().map(|f| f.len()).sum()
}
fn serialize_bundle(&self, wrt: &mut WritePtr) -> io::Result<()> {
let mut counting_writer = CountingWriter::wrap(wrt);
let mut file_index: HashMap<PathBuf, (u64, u64)> = HashMap::default();
for (path, source) in &self.fs {
let start = counting_writer.written_bytes();
counting_writer.write_all(source.as_slice())?;
let stop = counting_writer.written_bytes();
file_index.insert(path.to_path_buf(), (start, stop));
}
let index_offset = counting_writer.written_bytes();
serde_json::to_writer(&mut counting_writer, &file_index)?;
let index_offset_buffer = index_offset.to_le_bytes();
counting_writer.write_all(&index_offset_buffer[..])?;
Ok(())
}
}
impl fmt::Debug for RAMDirectory {
@@ -144,13 +162,18 @@ impl RAMDirectory {
pub fn total_mem_usage(&self) -> usize {
self.fs.read().unwrap().total_mem_usage()
}
/// Serialize the RAMDirectory into a bundle.
///
/// This method will fail, write nothing, and return an error if a
/// clone of this repository exists.
pub fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> {
let inner_directory_rlock = self.fs.read().unwrap();
inner_directory_rlock.serialize_bundle(wrt)
}
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
fail_point!("RAMDirectory::delete", |_| {
use crate::directory::error::IOError;
@@ -160,10 +183,6 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let mut fs = self.fs.write().unwrap();
let path_buf = PathBuf::from(path);
@@ -177,10 +196,6 @@ impl Directory for RAMDirectory {
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
Ok(self.open_read(path)?.as_slice().to_owned())
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new(
io::ErrorKind::Other,
@@ -204,3 +219,17 @@ impl Directory for RAMDirectory {
Ok(self.fs.write().unwrap().watch(watch_callback))
}
}
impl ReadOnlyDirectory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
Ok(self.open_read(path)?.as_slice().to_owned())
}
}

View File

@@ -25,10 +25,10 @@ impl DataCorruption {
}
}
pub fn comment_only(comment: String) -> DataCorruption {
pub fn comment_only<TS: ToString>(comment: TS) -> DataCorruption {
DataCorruption {
filepath: None,
comment,
comment: comment.to_string(),
}
}
}

View File

@@ -179,7 +179,7 @@ mod tests {
use super::*;
use crate::common::CompositeFile;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::fastfield::FastFieldReader;
use crate::merge_policy::NoMergePolicy;
use crate::schema::Field;

View File

@@ -4,7 +4,7 @@ use crate::common::compute_num_bits;
use crate::common::BinarySerializable;
use crate::common::CompositeFile;
use crate::directory::ReadOnlySource;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::fastfield::{FastFieldSerializer, FastFieldsWriter};
use crate::schema::Schema;
use crate::schema::FAST;

View File

@@ -1,10 +1,13 @@
use crate::Result;
use crate::Directory;
use crate::core::Segment;
use crate::core::SegmentComponent;
use crate::directory::error::OpenWriteError;
use crate::directory::{DirectoryClone, RAMDirectory, TerminatingWrite, WritePtr};
use crate::fastfield::FastFieldSerializer;
use crate::fieldnorm::FieldNormsSerializer;
use crate::postings::InvertedIndexSerializer;
use crate::schema::Schema;
use crate::store::StoreWriter;
/// Segment serializer is in charge of laying out on disk
@@ -14,25 +17,50 @@ pub struct SegmentSerializer {
fast_field_serializer: FastFieldSerializer,
fieldnorms_serializer: FieldNormsSerializer,
postings_serializer: InvertedIndexSerializer,
bundle_writer: Option<(RAMDirectory, WritePtr)>,
}
pub(crate) struct SegmentSerializerWriters {
postings_wrt: WritePtr,
positions_skip_wrt: WritePtr,
positions_wrt: WritePtr,
terms_wrt: WritePtr,
fast_field_wrt: WritePtr,
fieldnorms_wrt: WritePtr,
store_wrt: WritePtr,
}
impl SegmentSerializerWriters {
pub(crate) fn for_segment(segment: &mut Segment) -> Result<Self, OpenWriteError> {
Ok(SegmentSerializerWriters {
postings_wrt: segment.open_write(SegmentComponent::POSTINGS)?,
positions_skip_wrt: segment.open_write(SegmentComponent::POSITIONS)?,
positions_wrt: segment.open_write(SegmentComponent::POSITIONSSKIP)?,
terms_wrt: segment.open_write(SegmentComponent::TERMS)?,
fast_field_wrt: segment.open_write(SegmentComponent::FASTFIELDS)?,
fieldnorms_wrt: segment.open_write(SegmentComponent::FIELDNORMS)?,
store_wrt: segment.open_write(SegmentComponent::STORE)?,
})
}
}
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(segment: &mut Segment) -> Result<SegmentSerializer> {
let store_write = segment.open_write(SegmentComponent::STORE)?;
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
let fieldnorms_write = segment.open_write(SegmentComponent::FIELDNORMS)?;
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
let postings_serializer = InvertedIndexSerializer::open(segment)?;
pub(crate) fn new(schema: Schema, writers: SegmentSerializerWriters) -> crate::Result<Self> {
let fast_field_serializer = FastFieldSerializer::from_write(writers.fast_field_wrt)?;
let fieldnorms_serializer = FieldNormsSerializer::from_write(writers.fieldnorms_wrt)?;
let postings_serializer = InvertedIndexSerializer::open(
schema,
writers.terms_wrt,
writers.postings_wrt,
writers.positions_wrt,
writers.positions_skip_wrt,
);
Ok(SegmentSerializer {
store_writer: StoreWriter::new(store_write),
store_writer: StoreWriter::new(writers.store_wrt),
fast_field_serializer,
fieldnorms_serializer,
postings_serializer,
bundle_writer: None,
})
}
@@ -57,11 +85,15 @@ impl SegmentSerializer {
}
/// Finalize the segment serialization.
pub fn close(self) -> Result<()> {
pub fn close(mut self) -> crate::Result<()> {
self.fast_field_serializer.close()?;
self.postings_serializer.close()?;
self.store_writer.close()?;
self.fieldnorms_serializer.close()?;
if let Some((ram_directory, mut bundle_wrt)) = self.bundle_writer.take() {
ram_directory.serialize_bundle(&mut bundle_wrt)?;
bundle_wrt.terminate()?;
}
Ok(())
}
}

View File

@@ -12,6 +12,7 @@ use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_manager::SegmentsStatus;
use crate::indexer::segment_serializer::SegmentSerializerWriters;
use crate::indexer::stamper::Stamper;
use crate::indexer::SegmentEntry;
use crate::indexer::SegmentSerializer;
@@ -132,7 +133,9 @@ fn merge(
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
// ... we just serialize this index merger in our new segment to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut merged_segment)?;
let segment_serializer =
SegmentSerializer::new(merged_segment.schema(), segment_serializer_wrts)?;
let num_docs = merger.write(segment_serializer)?;

View File

@@ -3,7 +3,7 @@ use crate::core::Segment;
use crate::core::SerializableSegment;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::FieldNormsWriter;
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::indexer::segment_serializer::{SegmentSerializer, SegmentSerializerWriters};
use crate::postings::compute_table_size;
use crate::postings::MultiFieldPostingsWriter;
use crate::schema::FieldType;
@@ -69,7 +69,8 @@ impl SegmentWriter {
schema: &Schema,
) -> Result<SegmentWriter> {
let table_num_bits = initial_table_size(memory_budget)?;
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut segment)?;
let segment_serializer = SegmentSerializer::new(segment.schema(), segment_serializer_wrts)?;
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
let tokenizers = schema
.fields()

View File

@@ -75,7 +75,7 @@ pub mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut segment = index.new_segment();
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap();
let mut posting_serializer = InvertedIndexSerializer::for_segment(&mut segment).unwrap();
{
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap();
field_serializer.new_term("abc".as_bytes()).unwrap();

View File

@@ -10,8 +10,8 @@ use crate::postings::USE_SKIP_INFO_LIMIT;
use crate::schema::Schema;
use crate::schema::{Field, FieldEntry, FieldType};
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
use crate::DocId;
use crate::Result;
use crate::{Directory, DocId};
use std::io::{self, Write};
/// `InvertedIndexSerializer` is in charge of serializing
@@ -54,33 +54,36 @@ pub struct InvertedIndexSerializer {
}
impl InvertedIndexSerializer {
/// Open a new `InvertedIndexSerializer` for the given segment
fn create(
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
positionsidx_write: CompositeWrite<WritePtr>,
schema: Schema,
) -> Result<InvertedIndexSerializer> {
Ok(InvertedIndexSerializer {
terms_write,
postings_write,
positions_write,
positionsidx_write,
pub(crate) fn for_segment(segment: &mut Segment) -> crate::Result<Self> {
let schema = segment.schema();
use crate::core::SegmentComponent;
let terms_wrt = segment.open_write(SegmentComponent::TERMS)?;
let postings_wrt = segment.open_write(SegmentComponent::POSTINGS)?;
let positions_wrt = segment.open_write(SegmentComponent::POSITIONS)?;
let positions_idx_wrt = segment.open_write(SegmentComponent::POSITIONSSKIP)?;
Ok(Self::open(
schema,
})
terms_wrt,
postings_wrt,
positions_wrt,
positions_idx_wrt,
))
}
/// Open a new `PostingsSerializer` for the given segment
pub fn open(segment: &mut Segment) -> Result<InvertedIndexSerializer> {
use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
InvertedIndexSerializer::create(
CompositeWrite::wrap(segment.open_write(TERMS)?),
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
segment.schema(),
)
pub(crate) fn open(
schema: Schema,
terms_wrt: WritePtr,
postings_wrt: WritePtr,
positions_wrt: WritePtr,
positions_idx_wrt: WritePtr,
) -> InvertedIndexSerializer {
InvertedIndexSerializer {
terms_write: CompositeWrite::wrap(terms_wrt),
postings_write: CompositeWrite::wrap(postings_wrt),
positions_write: CompositeWrite::wrap(positions_wrt),
positionsidx_write: CompositeWrite::wrap(positions_idx_wrt),
schema,
}
}
/// Must be called before starting pushing terms of

View File

@@ -8,7 +8,7 @@ use crate::query::PhraseQuery;
use crate::query::Query;
use crate::query::RangeQuery;
use crate::query::TermQuery;
use crate::schema::IndexRecordOption;
use crate::schema::{Facet, IndexRecordOption};
use crate::schema::{Field, Schema};
use crate::schema::{FieldType, Term};
use crate::tokenizer::TokenizerManager;
@@ -319,7 +319,10 @@ impl QueryParser {
))
}
}
FieldType::HierarchicalFacet => Ok(vec![(0, Term::from_field_text(field, phrase))]),
FieldType::HierarchicalFacet => {
let facet = Facet::from_text(phrase);
Ok(vec![(0, Term::from_field_text(field, facet.encoded_str()))])
}
FieldType::Bytes => {
let field_name = self.schema.get_field_name(field).to_string();
Err(QueryParserError::FieldNotIndexed(field_name))
@@ -554,6 +557,7 @@ mod test {
schema_builder.add_text_field("with_stop_words", text_options);
schema_builder.add_date_field("date", INDEXED);
schema_builder.add_f64_field("float", INDEXED);
schema_builder.add_facet_field("facet");
let schema = schema_builder.build();
let default_fields = vec![title, text];
let tokenizer_manager = TokenizerManager::default();
@@ -588,9 +592,13 @@ mod test {
}
#[test]
pub fn test_parse_query_simple() {
pub fn test_parse_query_facet() {
let query_parser = make_query_parser();
assert!(query_parser.parse_query("toto").is_ok());
let query = query_parser.parse_query("facet:/root/branch/leaf").unwrap();
assert_eq!(
format!("{:?}", query),
"TermQuery(Term(field=11,bytes=[114, 111, 111, 116, 0, 98, 114, 97, 110, 99, 104, 0, 108, 101, 97, 102]))"
);
}
#[test]

View File

@@ -166,7 +166,7 @@ impl SchemaBuilder {
}
/// Adds a field entry to the schema in build.
fn add_field(&mut self, field_entry: FieldEntry) -> Field {
pub fn add_field(&mut self, field_entry: FieldEntry) -> Field {
let field = Field::from_field_id(self.fields.len() as u32);
let field_name = field_entry.name().to_string();
self.fields.push(field_entry);
@@ -401,6 +401,7 @@ pub enum DocParsingError {
mod tests {
use crate::schema::field_type::ValueParsingError;
use crate::schema::int_options::Cardinality::SingleValue;
use crate::schema::schema::DocParsingError::NotJSON;
use crate::schema::*;
use matches::{assert_matches, matches};
@@ -715,4 +716,94 @@ mod tests {
assert_matches!(json_err, Err(NotJSON(_)));
}
}
#[test]
pub fn test_schema_add_field() {
let mut schema_builder = SchemaBuilder::default();
let id_options = TextOptions::default().set_stored().set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic),
);
let timestamp_options = IntOptions::default()
.set_stored()
.set_indexed()
.set_fast(SingleValue);
schema_builder.add_text_field("_id", id_options);
schema_builder.add_date_field("_timestamp", timestamp_options);
let schema_content = r#"[
{
"name": "text",
"type": "text",
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
},
"stored": false
}
},
{
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"fast": "single",
"stored": true
}
}
]"#;
let tmp_schema: Schema =
serde_json::from_str(&schema_content).expect("error while reading json");
for (_field, field_entry) in tmp_schema.fields() {
schema_builder.add_field(field_entry.clone());
}
let schema = schema_builder.build();
let schema_json = serde_json::to_string_pretty(&schema).unwrap();
let expected = r#"[
{
"name": "_id",
"type": "text",
"options": {
"indexing": {
"record": "basic",
"tokenizer": "raw"
},
"stored": true
}
},
{
"name": "_timestamp",
"type": "date",
"options": {
"indexed": true,
"fast": "single",
"stored": true
}
},
{
"name": "text",
"type": "text",
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
},
"stored": false
}
},
{
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"fast": "single",
"stored": true
}
}
]"#;
assert_eq!(schema_json, expected);
}
}

View File

@@ -57,7 +57,7 @@ use self::compression_snap::{compress, decompress};
pub mod tests {
use super::*;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::schema::Document;
use crate::schema::FieldValue;
use crate::schema::Schema;

View File

@@ -36,7 +36,7 @@ pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
mod tests {
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::core::Index;
use crate::directory::{Directory, RAMDirectory, ReadOnlySource};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, ReadOnlySource};
use crate::postings::TermInfo;
use crate::schema::{Document, FieldType, Schema, TEXT};
use std::path::PathBuf;

View File

@@ -1,6 +1,8 @@
use fail;
use std::path::Path;
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
use tantivy::directory::{
Directory, ManagedDirectory, RAMDirectory, ReadOnlyDirectory, TerminatingWrite,
};
use tantivy::doc;
use tantivy::schema::{Schema, TEXT};
use tantivy::{Index, Term};