mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-29 21:42:55 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7973892a2 | ||
|
|
cd7484c035 | ||
|
|
7ed6bc8718 | ||
|
|
d12a06b65b | ||
|
|
749432f949 | ||
|
|
c1400f25a7 | ||
|
|
87120acf7c | ||
|
|
401f74f7ae | ||
|
|
03d31f6713 | ||
|
|
a57faf07f6 | ||
|
|
562ea9a839 | ||
|
|
cf92cc1ada | ||
|
|
f6000aece7 | ||
|
|
2b3fe3a2b5 | ||
|
|
0fde90faac |
14
CHANGELOG.md
14
CHANGELOG.md
@@ -1,3 +1,17 @@
|
||||
Tantivy 0.11.3
|
||||
=======================
|
||||
- Fixed DateTime as a fast field (#735)
|
||||
|
||||
Tantivy 0.11.2
|
||||
=======================
|
||||
- The future returned by `IndexWriter::merge` does not borrow `self` mutably anymore (#732)
|
||||
- Exposing a constructor for `WatchHandle` (#731)
|
||||
|
||||
Tantivy 0.11.1
|
||||
=====================
|
||||
- Bug fix #729
|
||||
|
||||
|
||||
Tantivy 0.11.0
|
||||
=====================
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.11.0"
|
||||
version = "0.11.3"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
|
||||
@@ -13,63 +13,100 @@
|
||||
// ---
|
||||
// Importing tantivy...
|
||||
use tantivy::collector::FacetCollector;
|
||||
use tantivy::query::AllQuery;
|
||||
use tantivy::query::{AllQuery, TermQuery};
|
||||
use tantivy::schema::*;
|
||||
use tantivy::{doc, Index};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn main() -> tantivy::Result<()> {
|
||||
// Let's create a temporary directory for the
|
||||
// sake of this example
|
||||
let index_path = TempDir::new()?;
|
||||
// Let's create a temporary directory for the sake of this example
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
schema_builder.add_text_field("name", TEXT | STORED);
|
||||
|
||||
// this is our faceted field
|
||||
schema_builder.add_facet_field("tags");
|
||||
let name = schema_builder.add_text_field("felin_name", TEXT | STORED);
|
||||
// this is our faceted field: its scientific classification
|
||||
let classification = schema_builder.add_facet_field("classification");
|
||||
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let index = Index::create_in_dir(&index_path, schema.clone())?;
|
||||
|
||||
let mut index_writer = index.writer(50_000_000)?;
|
||||
|
||||
let name = schema.get_field("name").unwrap();
|
||||
let tags = schema.get_field("tags").unwrap();
|
||||
let mut index_writer = index.writer(30_000_000)?;
|
||||
|
||||
// For convenience, tantivy also comes with a macro to
|
||||
// reduce the boilerplate above.
|
||||
index_writer.add_document(doc!(
|
||||
name => "the ditch",
|
||||
tags => Facet::from("/pools/north")
|
||||
name => "Cat",
|
||||
classification => Facet::from("/Felidae/Felinae/Felis")
|
||||
));
|
||||
|
||||
index_writer.add_document(doc!(
|
||||
name => "little stacey",
|
||||
tags => Facet::from("/pools/south")
|
||||
name => "Canada lynx",
|
||||
classification => Facet::from("/Felidae/Felinae/Lynx")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Cheetah",
|
||||
classification => Facet::from("/Felidae/Felinae/Acinonyx")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Tiger",
|
||||
classification => Facet::from("/Felidae/Pantherinae/Panthera")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Lion",
|
||||
classification => Facet::from("/Felidae/Pantherinae/Panthera")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Jaguar",
|
||||
classification => Facet::from("/Felidae/Pantherinae/Panthera")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Sunda clouded leopard",
|
||||
classification => Facet::from("/Felidae/Pantherinae/Neofelis")
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
name => "Fossa",
|
||||
classification => Facet::from("/Eupleridae/Cryptoprocta")
|
||||
));
|
||||
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
|
||||
let searcher = reader.searcher();
|
||||
{
|
||||
let mut facet_collector = FacetCollector::for_field(classification);
|
||||
facet_collector.add_facet("/Felidae");
|
||||
let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
|
||||
// This lists all of the facet counts, right below "/Felidae".
|
||||
let facets: Vec<(&Facet, u64)> = facet_counts.get("/Felidae").collect();
|
||||
assert_eq!(
|
||||
facets,
|
||||
vec![
|
||||
(&Facet::from("/Felidae/Felinae"), 3),
|
||||
(&Facet::from("/Felidae/Pantherinae"), 4),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
let mut facet_collector = FacetCollector::for_field(tags);
|
||||
facet_collector.add_facet("/pools");
|
||||
// Facets are also searchable.
|
||||
//
|
||||
// For instance a common UI pattern is to allow the user someone to click on a facet link
|
||||
// (e.g: `Pantherinae`) to drill down and filter the current result set with this subfacet.
|
||||
//
|
||||
// The search would then look as follows.
|
||||
|
||||
let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap();
|
||||
|
||||
// This lists all of the facet counts
|
||||
let facets: Vec<(&Facet, u64)> = facet_counts.get("/pools").collect();
|
||||
assert_eq!(
|
||||
facets,
|
||||
vec![
|
||||
(&Facet::from("/pools/north"), 1),
|
||||
(&Facet::from("/pools/south"), 1),
|
||||
]
|
||||
);
|
||||
// Check the reference doc for different ways to create a `Facet` object.
|
||||
{
|
||||
let facet = Facet::from_text("/Felidae/Pantherinae");
|
||||
let facet_term = Term::from_facet(classification, &facet);
|
||||
let facet_term_query = TermQuery::new(facet_term, IndexRecordOption::Basic);
|
||||
let mut facet_collector = FacetCollector::for_field(classification);
|
||||
facet_collector.add_facet("/Felidae/Pantherinae");
|
||||
let facet_counts = searcher.search(&facet_term_query, &facet_collector)?;
|
||||
let facets: Vec<(&Facet, u64)> = facet_counts.get("/Felidae/Pantherinae").collect();
|
||||
assert_eq!(
|
||||
facets,
|
||||
vec![
|
||||
(&Facet::from("/Felidae/Pantherinae/Neofelis"), 1),
|
||||
(&Facet::from("/Felidae/Pantherinae/Panthera"), 3),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -452,9 +452,11 @@ impl FacetCounts {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{FacetCollector, FacetCounts};
|
||||
use crate::collector::Count;
|
||||
use crate::core::Index;
|
||||
use crate::query::AllQuery;
|
||||
use crate::schema::{Document, Facet, Field, Schema};
|
||||
use crate::query::{AllQuery, QueryParser, TermQuery};
|
||||
use crate::schema::{Document, Facet, Field, IndexRecordOption, Schema};
|
||||
use crate::Term;
|
||||
use rand::distributions::Uniform;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::{thread_rng, Rng};
|
||||
@@ -544,6 +546,56 @@ mod tests {
|
||||
assert_eq!(facets[0].1, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_doc_search_by_facet() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let facet_field = schema_builder.add_facet_field("facet");
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(
|
||||
facet_field => Facet::from_text(&"/A/A"),
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
facet_field => Facet::from_text(&"/A/B"),
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
facet_field => Facet::from_text(&"/A/C/A"),
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
facet_field => Facet::from_text(&"/D/C/A"),
|
||||
));
|
||||
index_writer.commit().unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.num_docs(), 4);
|
||||
|
||||
let count_facet = |facet_str: &str| {
|
||||
let term = Term::from_facet(facet_field, &Facet::from_text(facet_str));
|
||||
searcher
|
||||
.search(&TermQuery::new(term, IndexRecordOption::Basic), &Count)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
assert_eq!(count_facet("/"), 4);
|
||||
assert_eq!(count_facet("/A"), 3);
|
||||
assert_eq!(count_facet("/A/B"), 1);
|
||||
assert_eq!(count_facet("/A/C"), 1);
|
||||
assert_eq!(count_facet("/A/C/A"), 1);
|
||||
assert_eq!(count_facet("/C/A"), 0);
|
||||
{
|
||||
let query_parser = QueryParser::for_index(&index, vec![]);
|
||||
{
|
||||
let query = query_parser.parse_query("facet:/A/B").unwrap();
|
||||
assert_eq!(1, searcher.search(&query, &Count).unwrap());
|
||||
}
|
||||
{
|
||||
let query = query_parser.parse_query("facet:/A").unwrap();
|
||||
assert_eq!(3, searcher.search(&query, &Count).unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_non_used_facet_collector() {
|
||||
let mut facet_collector = FacetCollector::for_field(Field::from_field_id(0));
|
||||
|
||||
@@ -186,7 +186,7 @@ mod test {
|
||||
use super::{CompositeFile, CompositeWrite};
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::common::VInt;
|
||||
use crate::directory::{Directory, RAMDirectory};
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory};
|
||||
use crate::schema::Field;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -338,7 +338,7 @@ impl Index {
|
||||
|
||||
/// Creates a new segment.
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
let segment_meta = self
|
||||
let mut segment_meta = self
|
||||
.inventory
|
||||
.new_segment_meta(SegmentId::generate_random(), 0);
|
||||
self.segment(segment_meta)
|
||||
|
||||
@@ -35,6 +35,7 @@ impl SegmentMetaInventory {
|
||||
segment_id,
|
||||
max_doc,
|
||||
deletes: None,
|
||||
bundled: false,
|
||||
};
|
||||
SegmentMeta::from(self.inventory.track(inner))
|
||||
}
|
||||
@@ -81,6 +82,19 @@ impl SegmentMeta {
|
||||
self.tracked.segment_id
|
||||
}
|
||||
|
||||
pub fn with_bundled(self) -> SegmentMeta {
|
||||
SegmentMeta::from(self.tracked.map(|inner| InnerSegmentMeta {
|
||||
segment_id: inner.segment_id,
|
||||
max_doc: inner.max_doc,
|
||||
deletes: inner.deletes.clone(),
|
||||
bundled: true,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn is_bundled(&self) -> bool {
|
||||
self.tracked.bundled
|
||||
}
|
||||
|
||||
/// Returns the number of deleted documents.
|
||||
pub fn num_deleted_docs(&self) -> u32 {
|
||||
self.tracked
|
||||
@@ -107,8 +121,12 @@ impl SegmentMeta {
|
||||
/// It just joins the segment id with the extension
|
||||
/// associated to a segment component.
|
||||
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
||||
let mut path = self.id().uuid_string();
|
||||
path.push_str(&*match component {
|
||||
let suffix = self.suffix(component);
|
||||
self.relative_path_from_suffix(&suffix)
|
||||
}
|
||||
|
||||
fn suffix(&self, component: SegmentComponent) -> String {
|
||||
match component {
|
||||
SegmentComponent::POSTINGS => ".idx".to_string(),
|
||||
SegmentComponent::POSITIONS => ".pos".to_string(),
|
||||
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
|
||||
@@ -117,7 +135,17 @@ impl SegmentMeta {
|
||||
SegmentComponent::FASTFIELDS => ".fast".to_string(),
|
||||
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
|
||||
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the relative path of a component of our segment.
|
||||
///
|
||||
/// It just joins the segment id with the extension
|
||||
/// associated to a segment component.
|
||||
pub fn relative_path_from_suffix(&self, suffix: &str) -> PathBuf {
|
||||
let mut path = self.id().uuid_string();
|
||||
path.push_str(".");
|
||||
path.push_str(&suffix);
|
||||
PathBuf::from(path)
|
||||
}
|
||||
|
||||
@@ -161,6 +189,7 @@ impl SegmentMeta {
|
||||
segment_id: inner_meta.segment_id,
|
||||
max_doc,
|
||||
deletes: None,
|
||||
bundled: inner_meta.bundled,
|
||||
});
|
||||
SegmentMeta { tracked }
|
||||
}
|
||||
@@ -175,6 +204,7 @@ impl SegmentMeta {
|
||||
segment_id: inner_meta.segment_id,
|
||||
max_doc: inner_meta.max_doc,
|
||||
deletes: Some(delete_meta),
|
||||
bundled: inner_meta.bundled,
|
||||
});
|
||||
SegmentMeta { tracked }
|
||||
}
|
||||
@@ -185,6 +215,7 @@ struct InnerSegmentMeta {
|
||||
segment_id: SegmentId,
|
||||
max_doc: u32,
|
||||
deletes: Option<DeleteMeta>,
|
||||
bundled: bool,
|
||||
}
|
||||
|
||||
impl InnerSegmentMeta {
|
||||
|
||||
@@ -4,14 +4,12 @@ use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::{ReadOnlySource, WritePtr};
|
||||
use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr};
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use crate::Result;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::result;
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
#[derive(Clone)]
|
||||
@@ -83,23 +81,30 @@ impl Segment {
|
||||
}
|
||||
|
||||
/// Open one of the component file for a *regular* read.
|
||||
pub fn open_read(
|
||||
&self,
|
||||
component: SegmentComponent,
|
||||
) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
pub fn open_read(&self, component: SegmentComponent) -> Result<ReadOnlySource, OpenReadError> {
|
||||
let path = self.relative_path(component);
|
||||
let source = self.index.directory().open_read(&path)?;
|
||||
Ok(source)
|
||||
}
|
||||
|
||||
/// Open one of the component file for *regular* write.
|
||||
pub fn open_write(
|
||||
pub fn open_write(&mut self, component: SegmentComponent) -> Result<WritePtr, OpenWriteError> {
|
||||
let path = self.relative_path(component);
|
||||
self.index.directory_mut().open_write(&path)
|
||||
}
|
||||
|
||||
pub fn open_bundle_writer(&mut self) -> Result<WritePtr, OpenWriteError> {
|
||||
let path = self.meta.relative_path_from_suffix("bundle");
|
||||
self.index.directory_mut().open_write(&path)
|
||||
}
|
||||
|
||||
pub(crate) fn open_write_in_directory(
|
||||
&mut self,
|
||||
component: SegmentComponent,
|
||||
) -> result::Result<WritePtr, OpenWriteError> {
|
||||
directory: &mut dyn Directory,
|
||||
) -> Result<WritePtr, OpenWriteError> {
|
||||
let path = self.relative_path(component);
|
||||
let write = self.index.directory_mut().open_write(&path)?;
|
||||
Ok(write)
|
||||
directory.open_write(&path)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,5 +114,5 @@ pub trait SerializableSegment {
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the segment.
|
||||
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
|
||||
fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32>;
|
||||
}
|
||||
|
||||
97
src/directory/bundle_directory.rs
Normal file
97
src/directory/bundle_directory.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
use crate::directory::directory::ReadOnlyDirectory;
|
||||
use crate::directory::error::OpenReadError;
|
||||
use crate::directory::ReadOnlySource;
|
||||
use crate::error::DataCorruption;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BundleDirectory {
|
||||
source_map: Arc<HashMap<PathBuf, ReadOnlySource>>,
|
||||
}
|
||||
|
||||
impl BundleDirectory {
|
||||
pub fn from_source(source: ReadOnlySource) -> Result<BundleDirectory, DataCorruption> {
|
||||
let mut index_offset_buf = [0u8; 8];
|
||||
let (body_idx, footer_offset) = source.split_from_end(8);
|
||||
index_offset_buf.copy_from_slice(footer_offset.as_slice());
|
||||
let offset = u64::from_le_bytes(index_offset_buf);
|
||||
let (body_source, idx_source) = body_idx.split(offset as usize);
|
||||
let idx: HashMap<PathBuf, (u64, u64)> = serde_json::from_slice(idx_source.as_slice())
|
||||
.map_err(|err| {
|
||||
let msg = format!("Failed to read index from bundle. {:?}", err);
|
||||
DataCorruption::comment_only(msg)
|
||||
})?;
|
||||
let source_map: HashMap<PathBuf, ReadOnlySource> = idx
|
||||
.into_iter()
|
||||
.map(|(path, (start, stop))| {
|
||||
let source = body_source.slice(start as usize, stop as usize);
|
||||
(path, source)
|
||||
})
|
||||
.collect();
|
||||
Ok(BundleDirectory {
|
||||
source_map: Arc::new(source_map),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadOnlyDirectory for BundleDirectory {
|
||||
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||
self.source_map
|
||||
.get(path)
|
||||
.cloned()
|
||||
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.source_map.contains_key(path)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
let source = self
|
||||
.source_map
|
||||
.get(path)
|
||||
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
|
||||
Ok(source.as_slice().to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::BundleDirectory;
|
||||
use crate::directory::{RAMDirectory, ReadOnlyDirectory, TerminatingWrite};
|
||||
use crate::Directory;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
#[test]
|
||||
fn test_bundle_directory() {
|
||||
let mut ram_directory = RAMDirectory::default();
|
||||
let test_path_atomic = Path::new("testpath_atomic");
|
||||
let test_path_wrt = Path::new("testpath_wrt");
|
||||
assert!(ram_directory
|
||||
.atomic_write(test_path_atomic, b"titi")
|
||||
.is_ok());
|
||||
{
|
||||
let mut test_wrt = ram_directory.open_write(test_path_wrt).unwrap();
|
||||
assert!(test_wrt.write_all(b"toto").is_ok());
|
||||
assert!(test_wrt.terminate().is_ok());
|
||||
}
|
||||
let mut dest_directory = RAMDirectory::default();
|
||||
let bundle_path = Path::new("bundle");
|
||||
let mut wrt = dest_directory.open_write(bundle_path).unwrap();
|
||||
assert!(ram_directory.serialize_bundle(&mut wrt).is_ok());
|
||||
assert!(wrt.terminate().is_ok());
|
||||
let source = dest_directory.open_read(bundle_path).unwrap();
|
||||
let bundle_directory = BundleDirectory::from_source(source).unwrap();
|
||||
assert_eq!(
|
||||
&bundle_directory.atomic_read(test_path_atomic).unwrap()[..],
|
||||
b"titi"
|
||||
);
|
||||
assert_eq!(
|
||||
&bundle_directory.open_read(test_path_wrt).unwrap()[..],
|
||||
b"toto"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -100,6 +100,30 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ReadOnlyDirectory {
|
||||
/// Opens a virtual file for read.
|
||||
///
|
||||
/// Once a virtual file is open, its data may not
|
||||
/// change.
|
||||
///
|
||||
/// Specifically, subsequent writes or flushes should
|
||||
/// have no effect on the returned `ReadOnlySource` object.
|
||||
///
|
||||
/// You should only use this to read files create with [Directory::open_write].
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
|
||||
|
||||
/// Returns true iff the file exists
|
||||
fn exists(&self, path: &Path) -> bool;
|
||||
|
||||
/// Reads the full content file that has been written using
|
||||
/// atomic_write.
|
||||
///
|
||||
/// This should only be used for small files.
|
||||
///
|
||||
/// You should only use this to read files create with [Directory::atomic_write].
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
|
||||
}
|
||||
|
||||
/// Write-once read many (WORM) abstraction for where
|
||||
/// tantivy's data should be stored.
|
||||
///
|
||||
@@ -110,18 +134,9 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
||||
/// should be used mostly for tests.
|
||||
///
|
||||
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// Opens a virtual file for read.
|
||||
///
|
||||
/// Once a virtual file is open, its data may not
|
||||
/// change.
|
||||
///
|
||||
/// Specifically, subsequent writes or flushes should
|
||||
/// have no effect on the returned `ReadOnlySource` object.
|
||||
///
|
||||
/// You should only use this to read files create with [`open_write`]
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
|
||||
|
||||
pub trait Directory:
|
||||
DirectoryClone + ReadOnlyDirectory + fmt::Debug + Send + Sync + 'static
|
||||
{
|
||||
/// Removes a file
|
||||
///
|
||||
/// Removing a file will not affect an eventual
|
||||
@@ -131,9 +146,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// `DeleteError::DoesNotExist`.
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
|
||||
|
||||
/// Returns true iff the file exists
|
||||
fn exists(&self, path: &Path) -> bool;
|
||||
|
||||
/// Opens a writer for the *virtual file* associated with
|
||||
/// a Path.
|
||||
///
|
||||
@@ -155,14 +167,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// The file may not previously exist.
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError>;
|
||||
|
||||
/// Reads the full content file that has been written using
|
||||
/// atomic_write.
|
||||
///
|
||||
/// This should only be used for small files.
|
||||
///
|
||||
/// You should only use this to read files create with [`atomic_write`]
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
|
||||
|
||||
/// Atomically replace the content of a file with data.
|
||||
///
|
||||
/// This calls ensure that reads can never *observe*
|
||||
@@ -197,7 +201,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// Registers a callback that will be called whenever a change on the `meta.json`
|
||||
/// using the `atomic_write` API is detected.
|
||||
///
|
||||
/// The behavior when using `.watch()` on a file using `.open_write(...)` is, on the other
|
||||
/// The behavior when using `.watch()` on a file using [Directory::open_write] is, on the other
|
||||
/// hand, undefined.
|
||||
///
|
||||
/// The file will be watched for the lifetime of the returned `WatchHandle`. The caller is
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
|
||||
use crate::directory::directory::ReadOnlyDirectory;
|
||||
use crc32fast::Hasher;
|
||||
use serde_json;
|
||||
use std::collections::HashSet;
|
||||
@@ -264,14 +265,6 @@ impl ManagedDirectory {
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
let read_only_source = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(read_only_source)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
footer.is_compatible()?;
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
||||
self.register_file_as_managed(path)
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
@@ -289,18 +282,10 @@ impl Directory for ManagedDirectory {
|
||||
self.directory.atomic_write(path, data)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
||||
self.directory.atomic_read(path)
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
self.directory.delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.directory.exists(path)
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
|
||||
self.directory.acquire_lock(lock)
|
||||
}
|
||||
@@ -310,6 +295,24 @@ impl Directory for ManagedDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadOnlyDirectory for ManagedDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
let read_only_source = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(read_only_source)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
footer.is_compatible()?;
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.directory.exists(path)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
||||
self.directory.atomic_read(path)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ManagedDirectory {
|
||||
fn clone(&self) -> ManagedDirectory {
|
||||
ManagedDirectory {
|
||||
@@ -323,7 +326,9 @@ impl Clone for ManagedDirectory {
|
||||
#[cfg(test)]
|
||||
mod tests_mmap_specific {
|
||||
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
|
||||
use crate::directory::{
|
||||
Directory, ManagedDirectory, MmapDirectory, ReadOnlyDirectory, TerminatingWrite,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
|
||||
@@ -6,6 +6,7 @@ use self::notify::RawEvent;
|
||||
use self::notify::RecursiveMode;
|
||||
use self::notify::Watcher;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::directory::ReadOnlyDirectory;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{
|
||||
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||
@@ -131,14 +132,13 @@ impl MmapCache {
|
||||
}
|
||||
self.cache.remove(full_path);
|
||||
self.counters.miss += 1;
|
||||
Ok(if let Some(mmap) = open_mmap(full_path)? {
|
||||
let mmap_opt = open_mmap(full_path)?;
|
||||
Ok(mmap_opt.map(|mmap| {
|
||||
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
|
||||
let mmap_weak = Arc::downgrade(&mmap_arc);
|
||||
self.cache.insert(full_path.to_owned(), mmap_weak);
|
||||
Some(mmap_arc)
|
||||
} else {
|
||||
None
|
||||
})
|
||||
mmap_arc
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,24 +408,6 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
}
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while reading {:?}",
|
||||
path
|
||||
);
|
||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||
})?;
|
||||
Ok(mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(ReadOnlySource::from)
|
||||
.unwrap_or_else(ReadOnlySource::empty))
|
||||
}
|
||||
|
||||
/// Any entry associated to the path in the mmap will be
|
||||
/// removed before the file is deleted.
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
@@ -444,11 +426,6 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
let full_path = self.resolve_path(path);
|
||||
full_path.exists()
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
debug!("Open Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
@@ -479,25 +456,6 @@ impl Directory for MmapDirectory {
|
||||
Ok(BufWriter::new(Box::new(writer)))
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
let full_path = self.resolve_path(path);
|
||||
let mut buffer = Vec::new();
|
||||
match File::open(&full_path) {
|
||||
Ok(mut file) => {
|
||||
file.read_to_end(&mut buffer)
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
Ok(buffer)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
|
||||
} else {
|
||||
Err(IOError::with_path(path.to_owned(), e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
@@ -531,6 +489,50 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadOnlyDirectory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while reading {:?}",
|
||||
path
|
||||
);
|
||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||
})?;
|
||||
Ok(mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(ReadOnlySource::from)
|
||||
.unwrap_or_else(ReadOnlySource::empty))
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
let full_path = self.resolve_path(path);
|
||||
full_path.exists()
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
let full_path = self.resolve_path(path);
|
||||
let mut buffer = Vec::new();
|
||||
match File::open(&full_path) {
|
||||
Ok(mut file) => {
|
||||
file.read_to_end(&mut buffer)
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
Ok(buffer)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
|
||||
} else {
|
||||
Err(IOError::with_path(path.to_owned(), e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ WORM directory abstraction.
|
||||
#[cfg(feature = "mmap")]
|
||||
mod mmap_directory;
|
||||
|
||||
mod bundle_directory;
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod footer;
|
||||
@@ -19,15 +20,13 @@ mod watch_event_router;
|
||||
pub mod error;
|
||||
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory::{Directory, DirectoryClone, ReadOnlyDirectory};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub(crate) use self::watch_event_router::WatchCallbackList;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchHandle};
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Outcome of the Garbage collection
|
||||
pub struct GarbageCollectionResult {
|
||||
/// List of files that were deleted in this cycle
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use crate::common::CountingWriter;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::directory::ReadOnlyDirectory;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::WatchCallbackList;
|
||||
@@ -115,6 +117,22 @@ impl InnerDirectory {
|
||||
fn total_mem_usage(&self) -> usize {
|
||||
self.fs.values().map(|f| f.len()).sum()
|
||||
}
|
||||
|
||||
fn serialize_bundle(&self, wrt: &mut WritePtr) -> io::Result<()> {
|
||||
let mut counting_writer = CountingWriter::wrap(wrt);
|
||||
let mut file_index: HashMap<PathBuf, (u64, u64)> = HashMap::default();
|
||||
for (path, source) in &self.fs {
|
||||
let start = counting_writer.written_bytes();
|
||||
counting_writer.write_all(source.as_slice())?;
|
||||
let stop = counting_writer.written_bytes();
|
||||
file_index.insert(path.to_path_buf(), (start, stop));
|
||||
}
|
||||
let index_offset = counting_writer.written_bytes();
|
||||
serde_json::to_writer(&mut counting_writer, &file_index)?;
|
||||
let index_offset_buffer = index_offset.to_le_bytes();
|
||||
counting_writer.write_all(&index_offset_buffer[..])?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for RAMDirectory {
|
||||
@@ -144,13 +162,18 @@ impl RAMDirectory {
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
/// Serialize the RAMDirectory into a bundle.
|
||||
///
|
||||
/// This method will fail, write nothing, and return an error if a
|
||||
/// clone of this repository exists.
|
||||
pub fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> {
|
||||
let inner_directory_rlock = self.fs.read().unwrap();
|
||||
inner_directory_rlock.serialize_bundle(wrt)
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
fail_point!("RAMDirectory::delete", |_| {
|
||||
use crate::directory::error::IOError;
|
||||
@@ -160,10 +183,6 @@ impl Directory for RAMDirectory {
|
||||
self.fs.write().unwrap().delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.read().unwrap().exists(path)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
let mut fs = self.fs.write().unwrap();
|
||||
let path_buf = PathBuf::from(path);
|
||||
@@ -177,10 +196,6 @@ impl Directory for RAMDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
Ok(self.open_read(path)?.as_slice().to_owned())
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
@@ -204,3 +219,17 @@ impl Directory for RAMDirectory {
|
||||
Ok(self.fs.write().unwrap().watch(watch_callback))
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadOnlyDirectory for RAMDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.read().unwrap().exists(path)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
Ok(self.open_read(path)?.as_slice().to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,13 +24,20 @@ pub struct WatchCallbackList {
|
||||
#[derive(Clone)]
|
||||
pub struct WatchHandle(Arc<WatchCallback>);
|
||||
|
||||
impl WatchHandle {
|
||||
/// Create a WatchHandle handle.
|
||||
pub fn new(watch_callback: Arc<WatchCallback>) -> WatchHandle {
|
||||
WatchHandle(watch_callback)
|
||||
}
|
||||
}
|
||||
|
||||
impl WatchCallbackList {
|
||||
/// Suscribes a new callback and returns a handle that controls the lifetime of the callback.
|
||||
pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
|
||||
let watch_callback_arc = Arc::new(watch_callback);
|
||||
let watch_callback_weak = Arc::downgrade(&watch_callback_arc);
|
||||
self.router.write().unwrap().push(watch_callback_weak);
|
||||
WatchHandle(watch_callback_arc)
|
||||
WatchHandle::new(watch_callback_arc)
|
||||
}
|
||||
|
||||
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
|
||||
|
||||
@@ -25,10 +25,10 @@ impl DataCorruption {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn comment_only(comment: String) -> DataCorruption {
|
||||
pub fn comment_only<TS: ToString>(comment: TS) -> DataCorruption {
|
||||
DataCorruption {
|
||||
filepath: None,
|
||||
comment,
|
||||
comment: comment.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ pub use self::reader::FastFieldReader;
|
||||
pub use self::readers::FastFieldReaders;
|
||||
pub use self::serializer::FastFieldSerializer;
|
||||
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
|
||||
use crate::chrono::{NaiveDateTime, Utc};
|
||||
use crate::common;
|
||||
use crate::schema::Cardinality;
|
||||
use crate::schema::FieldType;
|
||||
@@ -49,7 +50,7 @@ mod serializer;
|
||||
mod writer;
|
||||
|
||||
/// Trait for types that are allowed for fast fields: (u64, i64 and f64).
|
||||
pub trait FastValue: Default + Clone + Copy + Send + Sync + PartialOrd {
|
||||
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd {
|
||||
/// Converts a value from u64
|
||||
///
|
||||
/// Internally all fast field values are encoded as u64.
|
||||
@@ -69,6 +70,12 @@ pub trait FastValue: Default + Clone + Copy + Send + Sync + PartialOrd {
|
||||
/// Cast value to `u64`.
|
||||
/// The value is just reinterpreted in memory.
|
||||
fn as_u64(&self) -> u64;
|
||||
|
||||
/// Build a default value. This default value is never used, so the value does not
|
||||
/// really matter.
|
||||
fn make_zero() -> Self {
|
||||
Self::from_u64(0i64.to_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for u64 {
|
||||
@@ -135,11 +142,34 @@ impl FastValue for f64 {
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for crate::DateTime {
|
||||
fn from_u64(timestamp_u64: u64) -> Self {
|
||||
let timestamp_i64 = i64::from_u64(timestamp_u64);
|
||||
crate::DateTime::from_utc(NaiveDateTime::from_timestamp(timestamp_i64, 0), Utc)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
self.timestamp().to_u64()
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::Date(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
self.timestamp().as_u64()
|
||||
}
|
||||
}
|
||||
|
||||
fn value_to_u64(value: &Value) -> u64 {
|
||||
match *value {
|
||||
Value::U64(ref val) => *val,
|
||||
Value::I64(ref val) => common::i64_to_u64(*val),
|
||||
Value::F64(ref val) => common::f64_to_u64(*val),
|
||||
Value::Date(ref datetime) => common::i64_to_u64(datetime.timestamp()),
|
||||
_ => panic!("Expected a u64/i64/f64 field, got {:?} ", value),
|
||||
}
|
||||
}
|
||||
@@ -149,12 +179,14 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::common::CompositeFile;
|
||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::schema::Document;
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::schema::Field;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::FAST;
|
||||
use crate::schema::{Document, IntOptions};
|
||||
use crate::{Index, SegmentId, SegmentReader};
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::StdRng;
|
||||
@@ -178,6 +210,12 @@ mod tests {
|
||||
assert_eq!(test_fastfield.get(2), 300);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_fastfield_i64_u64() {
|
||||
let datetime = crate::DateTime::from_utc(NaiveDateTime::from_timestamp(0i64, 0), Utc);
|
||||
assert_eq!(i64::from_u64(datetime.to_u64()), 0i64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intfastfield_small() {
|
||||
let path = Path::new("test");
|
||||
@@ -429,6 +467,93 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_missing_date_fast_field() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let date_field = schema_builder.add_date_field("date", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
index_writer.add_document(doc!(date_field =>crate::chrono::prelude::Utc::now()));
|
||||
index_writer.commit().unwrap();
|
||||
index_writer.add_document(doc!());
|
||||
index_writer.commit().unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let segment_ids: Vec<SegmentId> = reader
|
||||
.searcher()
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.map(SegmentReader::segment_id)
|
||||
.collect();
|
||||
assert_eq!(segment_ids.len(), 2);
|
||||
let merge_future = index_writer.merge(&segment_ids[..]);
|
||||
let merge_res = futures::executor::block_on(merge_future);
|
||||
assert!(merge_res.is_ok());
|
||||
assert!(reader.reload().is_ok());
|
||||
assert_eq!(reader.searcher().segment_readers().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_datetime() {
|
||||
assert_eq!(crate::DateTime::make_zero().timestamp(), 0i64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_datefastfield() {
|
||||
use crate::fastfield::FastValue;
|
||||
let mut schema_builder = Schema::builder();
|
||||
let date_field = schema_builder.add_date_field("date", FAST);
|
||||
let multi_date_field = schema_builder.add_date_field(
|
||||
"multi_date",
|
||||
IntOptions::default().set_fast(Cardinality::MultiValues),
|
||||
);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
index_writer.add_document(doc!(
|
||||
date_field => crate::DateTime::from_u64(1i64.to_u64()),
|
||||
multi_date_field => crate::DateTime::from_u64(2i64.to_u64()),
|
||||
multi_date_field => crate::DateTime::from_u64(3i64.to_u64())
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
date_field => crate::DateTime::from_u64(4i64.to_u64())
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
multi_date_field => crate::DateTime::from_u64(5i64.to_u64()),
|
||||
multi_date_field => crate::DateTime::from_u64(6i64.to_u64())
|
||||
));
|
||||
index_writer.commit().unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let fast_fields = segment_reader.fast_fields();
|
||||
let date_fast_field = fast_fields.date(date_field).unwrap();
|
||||
let dates_fast_field = fast_fields.dates(multi_date_field).unwrap();
|
||||
let mut dates = vec![];
|
||||
{
|
||||
assert_eq!(date_fast_field.get(0u32).timestamp(), 1i64);
|
||||
dates_fast_field.get_vals(0u32, &mut dates);
|
||||
assert_eq!(dates.len(), 2);
|
||||
assert_eq!(dates[0].timestamp(), 2i64);
|
||||
assert_eq!(dates[1].timestamp(), 3i64);
|
||||
}
|
||||
{
|
||||
assert_eq!(date_fast_field.get(1u32).timestamp(), 4i64);
|
||||
dates_fast_field.get_vals(1u32, &mut dates);
|
||||
assert!(dates.is_empty());
|
||||
}
|
||||
{
|
||||
assert_eq!(date_fast_field.get(2u32).timestamp(), 0i64);
|
||||
dates_fast_field.get_vals(2u32, &mut dates);
|
||||
assert_eq!(dates.len(), 2);
|
||||
assert_eq!(dates[0].timestamp(), 5i64);
|
||||
assert_eq!(dates[1].timestamp(), 6i64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
|
||||
@@ -45,7 +45,7 @@ impl<Item: FastValue> MultiValueIntFastFieldReader<Item> {
|
||||
pub fn get_vals(&self, doc: DocId, vals: &mut Vec<Item>) {
|
||||
let (start, stop) = self.range(doc);
|
||||
let len = (stop - start) as usize;
|
||||
vals.resize(len, Item::default());
|
||||
vals.resize(len, Item::make_zero());
|
||||
self.vals_reader.get_range_u64(start, &mut vals[..]);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::common::compute_num_bits;
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::common::CompositeFile;
|
||||
use crate::directory::ReadOnlySource;
|
||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||
use crate::fastfield::{FastFieldSerializer, FastFieldsWriter};
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::FAST;
|
||||
|
||||
@@ -15,9 +15,11 @@ pub struct FastFieldReaders {
|
||||
fast_field_i64: HashMap<Field, FastFieldReader<i64>>,
|
||||
fast_field_u64: HashMap<Field, FastFieldReader<u64>>,
|
||||
fast_field_f64: HashMap<Field, FastFieldReader<f64>>,
|
||||
fast_field_date: HashMap<Field, FastFieldReader<crate::DateTime>>,
|
||||
fast_field_i64s: HashMap<Field, MultiValueIntFastFieldReader<i64>>,
|
||||
fast_field_u64s: HashMap<Field, MultiValueIntFastFieldReader<u64>>,
|
||||
fast_field_f64s: HashMap<Field, MultiValueIntFastFieldReader<f64>>,
|
||||
fast_field_dates: HashMap<Field, MultiValueIntFastFieldReader<crate::DateTime>>,
|
||||
fast_bytes: HashMap<Field, BytesFastFieldReader>,
|
||||
fast_fields_composite: CompositeFile,
|
||||
}
|
||||
@@ -26,6 +28,7 @@ enum FastType {
|
||||
I64,
|
||||
U64,
|
||||
F64,
|
||||
Date,
|
||||
}
|
||||
|
||||
fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality)> {
|
||||
@@ -39,6 +42,9 @@ fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality
|
||||
FieldType::F64(options) => options
|
||||
.get_fastfield_cardinality()
|
||||
.map(|cardinality| (FastType::F64, cardinality)),
|
||||
FieldType::Date(options) => options
|
||||
.get_fastfield_cardinality()
|
||||
.map(|cardinality| (FastType::Date, cardinality)),
|
||||
FieldType::HierarchicalFacet => Some((FastType::U64, Cardinality::MultiValues)),
|
||||
_ => None,
|
||||
}
|
||||
@@ -53,9 +59,11 @@ impl FastFieldReaders {
|
||||
fast_field_i64: Default::default(),
|
||||
fast_field_u64: Default::default(),
|
||||
fast_field_f64: Default::default(),
|
||||
fast_field_date: Default::default(),
|
||||
fast_field_i64s: Default::default(),
|
||||
fast_field_u64s: Default::default(),
|
||||
fast_field_f64s: Default::default(),
|
||||
fast_field_dates: Default::default(),
|
||||
fast_bytes: Default::default(),
|
||||
fast_fields_composite: fast_fields_composite.clone(),
|
||||
};
|
||||
@@ -95,6 +103,12 @@ impl FastFieldReaders {
|
||||
FastFieldReader::open(fast_field_data.clone()),
|
||||
);
|
||||
}
|
||||
FastType::Date => {
|
||||
fast_field_readers.fast_field_date.insert(
|
||||
field,
|
||||
FastFieldReader::open(fast_field_data.clone()),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
|
||||
@@ -130,6 +144,14 @@ impl FastFieldReaders {
|
||||
.fast_field_f64s
|
||||
.insert(field, multivalued_int_fast_field);
|
||||
}
|
||||
FastType::Date => {
|
||||
let vals_reader = FastFieldReader::open(fast_field_data);
|
||||
let multivalued_int_fast_field =
|
||||
MultiValueIntFastFieldReader::open(idx_reader, vals_reader);
|
||||
fast_field_readers
|
||||
.fast_field_dates
|
||||
.insert(field, multivalued_int_fast_field);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
|
||||
@@ -167,6 +189,9 @@ impl FastFieldReaders {
|
||||
if let Some(f64_ff_reader) = self.f64(field) {
|
||||
return Some(f64_ff_reader.into_u64_reader());
|
||||
}
|
||||
if let Some(date_ff_reader) = self.date(field) {
|
||||
return Some(date_ff_reader.into_u64_reader());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -177,6 +202,13 @@ impl FastFieldReaders {
|
||||
self.fast_field_i64.get(&field).cloned()
|
||||
}
|
||||
|
||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns `None`.
|
||||
pub fn date(&self, field: Field) -> Option<FastFieldReader<crate::DateTime>> {
|
||||
self.fast_field_date.get(&field).cloned()
|
||||
}
|
||||
|
||||
/// Returns the `f64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a f64 fast field, this method returns `None`.
|
||||
@@ -223,6 +255,13 @@ impl FastFieldReaders {
|
||||
self.fast_field_f64s.get(&field).cloned()
|
||||
}
|
||||
|
||||
/// Returns a `crate::DateTime` multi-valued fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a `crate::DateTime` multi-valued fast field, this method returns `None`.
|
||||
pub fn dates(&self, field: Field) -> Option<MultiValueIntFastFieldReader<crate::DateTime>> {
|
||||
self.fast_field_dates.get(&field).cloned()
|
||||
}
|
||||
|
||||
/// Returns the `bytes` fast field reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a bytes fast field, returns `None`.
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::common::BinarySerializable;
|
||||
use crate::common::VInt;
|
||||
use crate::fastfield::{BytesFastFieldWriter, FastFieldSerializer};
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Cardinality, Document, Field, FieldType, Schema};
|
||||
use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use fnv::FnvHashMap;
|
||||
use std::collections::HashMap;
|
||||
@@ -17,6 +17,14 @@ pub struct FastFieldsWriter {
|
||||
bytes_value_writers: Vec<BytesFastFieldWriter>,
|
||||
}
|
||||
|
||||
fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
|
||||
match *field_entry.field_type() {
|
||||
FieldType::I64(_) | FieldType::Date(_) => common::i64_to_u64(0i64),
|
||||
FieldType::F64(_) => common::f64_to_u64(0.0f64),
|
||||
_ => 0u64,
|
||||
}
|
||||
}
|
||||
|
||||
impl FastFieldsWriter {
|
||||
/// Create all `FastFieldWriter` required by the schema.
|
||||
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
|
||||
@@ -25,18 +33,15 @@ impl FastFieldsWriter {
|
||||
let mut bytes_value_writers = Vec::new();
|
||||
|
||||
for (field, field_entry) in schema.fields() {
|
||||
let default_value = match *field_entry.field_type() {
|
||||
FieldType::I64(_) => common::i64_to_u64(0i64),
|
||||
FieldType::F64(_) => common::f64_to_u64(0.0f64),
|
||||
_ => 0u64,
|
||||
};
|
||||
match *field_entry.field_type() {
|
||||
FieldType::I64(ref int_options)
|
||||
| FieldType::U64(ref int_options)
|
||||
| FieldType::F64(ref int_options) => {
|
||||
| FieldType::F64(ref int_options)
|
||||
| FieldType::Date(ref int_options) => {
|
||||
match int_options.get_fastfield_cardinality() {
|
||||
Some(Cardinality::SingleValue) => {
|
||||
let mut fast_field_writer = IntFastFieldWriter::new(field);
|
||||
let default_value = fast_field_default_value(field_entry);
|
||||
fast_field_writer.set_val_if_missing(default_value);
|
||||
single_value_writers.push(fast_field_writer);
|
||||
}
|
||||
|
||||
@@ -142,8 +142,7 @@ pub(crate) fn advance_deletes(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut delete_cursor = segment_entry.delete_cursor().clone();
|
||||
if segment_entry.delete_bitset().is_none() && delete_cursor.get().is_none() {
|
||||
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
|
||||
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
|
||||
return Ok(());
|
||||
}
|
||||
@@ -159,7 +158,7 @@ pub(crate) fn advance_deletes(
|
||||
compute_deleted_bitset(
|
||||
&mut delete_bitset,
|
||||
&segment_reader,
|
||||
&mut delete_cursor,
|
||||
segment_entry.delete_cursor(),
|
||||
&DocToOpstampMapping::None,
|
||||
target_opstamp,
|
||||
)?;
|
||||
@@ -515,9 +514,13 @@ impl IndexWriter {
|
||||
/// Merges a given list of segments
|
||||
///
|
||||
/// `segment_ids` is required to be non-empty.
|
||||
pub async fn merge(&mut self, segment_ids: &[SegmentId]) -> crate::Result<SegmentMeta> {
|
||||
pub fn merge(
|
||||
&mut self,
|
||||
segment_ids: &[SegmentId],
|
||||
) -> impl Future<Output = crate::Result<SegmentMeta>> {
|
||||
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
|
||||
self.segment_updater.start_merge(merge_operation)?.await
|
||||
let segment_updater = self.segment_updater.clone();
|
||||
async move { segment_updater.start_merge(merge_operation)?.await }
|
||||
}
|
||||
|
||||
/// Closes the current document channel send.
|
||||
|
||||
@@ -33,6 +33,7 @@ pub type DefaultMergePolicy = LogMergePolicy;
|
||||
mod tests {
|
||||
use crate::schema::{self, Schema};
|
||||
use crate::{Index, Term};
|
||||
|
||||
#[test]
|
||||
fn test_advance_delete_bug() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use crate::Result;
|
||||
use crate::Directory;
|
||||
|
||||
use crate::core::Segment;
|
||||
use crate::core::SegmentComponent;
|
||||
use crate::directory::error::OpenWriteError;
|
||||
use crate::directory::{DirectoryClone, RAMDirectory, TerminatingWrite, WritePtr};
|
||||
use crate::fastfield::FastFieldSerializer;
|
||||
use crate::fieldnorm::FieldNormsSerializer;
|
||||
use crate::postings::InvertedIndexSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::store::StoreWriter;
|
||||
|
||||
/// Segment serializer is in charge of laying out on disk
|
||||
@@ -14,25 +17,50 @@ pub struct SegmentSerializer {
|
||||
fast_field_serializer: FastFieldSerializer,
|
||||
fieldnorms_serializer: FieldNormsSerializer,
|
||||
postings_serializer: InvertedIndexSerializer,
|
||||
bundle_writer: Option<(RAMDirectory, WritePtr)>,
|
||||
}
|
||||
|
||||
pub(crate) struct SegmentSerializerWriters {
|
||||
postings_wrt: WritePtr,
|
||||
positions_skip_wrt: WritePtr,
|
||||
positions_wrt: WritePtr,
|
||||
terms_wrt: WritePtr,
|
||||
fast_field_wrt: WritePtr,
|
||||
fieldnorms_wrt: WritePtr,
|
||||
store_wrt: WritePtr,
|
||||
}
|
||||
|
||||
impl SegmentSerializerWriters {
|
||||
pub(crate) fn for_segment(segment: &mut Segment) -> Result<Self, OpenWriteError> {
|
||||
Ok(SegmentSerializerWriters {
|
||||
postings_wrt: segment.open_write(SegmentComponent::POSTINGS)?,
|
||||
positions_skip_wrt: segment.open_write(SegmentComponent::POSITIONS)?,
|
||||
positions_wrt: segment.open_write(SegmentComponent::POSITIONSSKIP)?,
|
||||
terms_wrt: segment.open_write(SegmentComponent::TERMS)?,
|
||||
fast_field_wrt: segment.open_write(SegmentComponent::FASTFIELDS)?,
|
||||
fieldnorms_wrt: segment.open_write(SegmentComponent::FIELDNORMS)?,
|
||||
store_wrt: segment.open_write(SegmentComponent::STORE)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentSerializer {
|
||||
/// Creates a new `SegmentSerializer`.
|
||||
pub fn for_segment(segment: &mut Segment) -> Result<SegmentSerializer> {
|
||||
let store_write = segment.open_write(SegmentComponent::STORE)?;
|
||||
|
||||
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
|
||||
|
||||
let fieldnorms_write = segment.open_write(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
|
||||
|
||||
let postings_serializer = InvertedIndexSerializer::open(segment)?;
|
||||
pub(crate) fn new(schema: Schema, writers: SegmentSerializerWriters) -> crate::Result<Self> {
|
||||
let fast_field_serializer = FastFieldSerializer::from_write(writers.fast_field_wrt)?;
|
||||
let fieldnorms_serializer = FieldNormsSerializer::from_write(writers.fieldnorms_wrt)?;
|
||||
let postings_serializer = InvertedIndexSerializer::open(
|
||||
schema,
|
||||
writers.terms_wrt,
|
||||
writers.postings_wrt,
|
||||
writers.positions_wrt,
|
||||
writers.positions_skip_wrt,
|
||||
);
|
||||
Ok(SegmentSerializer {
|
||||
store_writer: StoreWriter::new(store_write),
|
||||
store_writer: StoreWriter::new(writers.store_wrt),
|
||||
fast_field_serializer,
|
||||
fieldnorms_serializer,
|
||||
postings_serializer,
|
||||
bundle_writer: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -57,11 +85,15 @@ impl SegmentSerializer {
|
||||
}
|
||||
|
||||
/// Finalize the segment serialization.
|
||||
pub fn close(self) -> Result<()> {
|
||||
pub fn close(mut self) -> crate::Result<()> {
|
||||
self.fast_field_serializer.close()?;
|
||||
self.postings_serializer.close()?;
|
||||
self.store_writer.close()?;
|
||||
self.fieldnorms_serializer.close()?;
|
||||
if let Some((ram_directory, mut bundle_wrt)) = self.bundle_writer.take() {
|
||||
ram_directory.serialize_bundle(&mut bundle_wrt)?;
|
||||
bundle_wrt.terminate()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::indexer::index_writer::advance_deletes;
|
||||
use crate::indexer::merge_operation::MergeOperationInventory;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::indexer::segment_manager::SegmentsStatus;
|
||||
use crate::indexer::segment_serializer::SegmentSerializerWriters;
|
||||
use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::indexer::SegmentSerializer;
|
||||
@@ -132,7 +133,9 @@ fn merge(
|
||||
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
|
||||
|
||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
||||
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut merged_segment)?;
|
||||
let segment_serializer =
|
||||
SegmentSerializer::new(merged_segment.schema(), segment_serializer_wrts)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::core::Segment;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::fastfield::FastFieldsWriter;
|
||||
use crate::fieldnorm::FieldNormsWriter;
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::indexer::segment_serializer::{SegmentSerializer, SegmentSerializerWriters};
|
||||
use crate::postings::compute_table_size;
|
||||
use crate::postings::MultiFieldPostingsWriter;
|
||||
use crate::schema::FieldType;
|
||||
@@ -69,7 +69,8 @@ impl SegmentWriter {
|
||||
schema: &Schema,
|
||||
) -> Result<SegmentWriter> {
|
||||
let table_num_bits = initial_table_size(memory_budget)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
|
||||
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut segment)?;
|
||||
let segment_serializer = SegmentSerializer::new(segment.schema(), segment_serializer_wrts)?;
|
||||
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
||||
let tokenizers = schema
|
||||
.fields()
|
||||
|
||||
69
src/lib.rs
69
src/lib.rs
@@ -940,4 +940,73 @@ mod tests {
|
||||
assert_eq!(fast_field_reader.get(0), 4f64)
|
||||
}
|
||||
}
|
||||
|
||||
// motivated by #729
|
||||
#[test]
|
||||
fn test_update_via_delete_insert() {
|
||||
use crate::collector::Count;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::AllQuery;
|
||||
use crate::SegmentId;
|
||||
use futures::executor::block_on;
|
||||
|
||||
const DOC_COUNT: u64 = 2u64;
|
||||
|
||||
let mut schema_builder = SchemaBuilder::default();
|
||||
let id = schema_builder.add_u64_field("id", INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let index_reader = index.reader().unwrap();
|
||||
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
|
||||
for doc_id in 0u64..DOC_COUNT {
|
||||
index_writer.add_document(doc!(id => doc_id));
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
|
||||
index_reader.reload().unwrap();
|
||||
let searcher = index_reader.searcher();
|
||||
|
||||
assert_eq!(
|
||||
searcher.search(&AllQuery, &Count).unwrap(),
|
||||
DOC_COUNT as usize
|
||||
);
|
||||
|
||||
// update the 10 elements by deleting and re-adding
|
||||
for doc_id in 0u64..DOC_COUNT {
|
||||
index_writer.delete_term(Term::from_field_u64(id, doc_id));
|
||||
index_writer.commit().unwrap();
|
||||
index_reader.reload().unwrap();
|
||||
let doc = doc!(id => doc_id);
|
||||
index_writer.add_document(doc);
|
||||
index_writer.commit().unwrap();
|
||||
index_reader.reload().unwrap();
|
||||
let searcher = index_reader.searcher();
|
||||
// The number of document should be stable.
|
||||
assert_eq!(
|
||||
searcher.search(&AllQuery, &Count).unwrap(),
|
||||
DOC_COUNT as usize
|
||||
);
|
||||
}
|
||||
|
||||
index_reader.reload().unwrap();
|
||||
let searcher = index_reader.searcher();
|
||||
let segment_ids: Vec<SegmentId> = searcher
|
||||
.segment_readers()
|
||||
.into_iter()
|
||||
.map(|reader| reader.segment_id())
|
||||
.collect();
|
||||
block_on(index_writer.merge(&segment_ids)).unwrap();
|
||||
|
||||
index_reader.reload().unwrap();
|
||||
let searcher = index_reader.searcher();
|
||||
|
||||
assert_eq!(
|
||||
searcher.search(&AllQuery, &Count).unwrap(),
|
||||
DOC_COUNT as usize
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ pub mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut segment = index.new_segment();
|
||||
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap();
|
||||
let mut posting_serializer = InvertedIndexSerializer::for_segment(&mut segment).unwrap();
|
||||
{
|
||||
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap();
|
||||
field_serializer.new_term("abc".as_bytes()).unwrap();
|
||||
|
||||
@@ -10,8 +10,8 @@ use crate::postings::USE_SKIP_INFO_LIMIT;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, FieldEntry, FieldType};
|
||||
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
|
||||
use crate::DocId;
|
||||
use crate::Result;
|
||||
use crate::{Directory, DocId};
|
||||
use std::io::{self, Write};
|
||||
|
||||
/// `InvertedIndexSerializer` is in charge of serializing
|
||||
@@ -54,33 +54,36 @@ pub struct InvertedIndexSerializer {
|
||||
}
|
||||
|
||||
impl InvertedIndexSerializer {
|
||||
/// Open a new `InvertedIndexSerializer` for the given segment
|
||||
fn create(
|
||||
terms_write: CompositeWrite<WritePtr>,
|
||||
postings_write: CompositeWrite<WritePtr>,
|
||||
positions_write: CompositeWrite<WritePtr>,
|
||||
positionsidx_write: CompositeWrite<WritePtr>,
|
||||
schema: Schema,
|
||||
) -> Result<InvertedIndexSerializer> {
|
||||
Ok(InvertedIndexSerializer {
|
||||
terms_write,
|
||||
postings_write,
|
||||
positions_write,
|
||||
positionsidx_write,
|
||||
pub(crate) fn for_segment(segment: &mut Segment) -> crate::Result<Self> {
|
||||
let schema = segment.schema();
|
||||
use crate::core::SegmentComponent;
|
||||
let terms_wrt = segment.open_write(SegmentComponent::TERMS)?;
|
||||
let postings_wrt = segment.open_write(SegmentComponent::POSTINGS)?;
|
||||
let positions_wrt = segment.open_write(SegmentComponent::POSITIONS)?;
|
||||
let positions_idx_wrt = segment.open_write(SegmentComponent::POSITIONSSKIP)?;
|
||||
Ok(Self::open(
|
||||
schema,
|
||||
})
|
||||
terms_wrt,
|
||||
postings_wrt,
|
||||
positions_wrt,
|
||||
positions_idx_wrt,
|
||||
))
|
||||
}
|
||||
|
||||
/// Open a new `PostingsSerializer` for the given segment
|
||||
pub fn open(segment: &mut Segment) -> Result<InvertedIndexSerializer> {
|
||||
use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
|
||||
InvertedIndexSerializer::create(
|
||||
CompositeWrite::wrap(segment.open_write(TERMS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
|
||||
segment.schema(),
|
||||
)
|
||||
pub(crate) fn open(
|
||||
schema: Schema,
|
||||
terms_wrt: WritePtr,
|
||||
postings_wrt: WritePtr,
|
||||
positions_wrt: WritePtr,
|
||||
positions_idx_wrt: WritePtr,
|
||||
) -> InvertedIndexSerializer {
|
||||
InvertedIndexSerializer {
|
||||
terms_write: CompositeWrite::wrap(terms_wrt),
|
||||
postings_write: CompositeWrite::wrap(postings_wrt),
|
||||
positions_write: CompositeWrite::wrap(positions_wrt),
|
||||
positionsidx_write: CompositeWrite::wrap(positions_idx_wrt),
|
||||
schema,
|
||||
}
|
||||
}
|
||||
|
||||
/// Must be called before starting pushing terms of
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::query::PhraseQuery;
|
||||
use crate::query::Query;
|
||||
use crate::query::RangeQuery;
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::{Facet, IndexRecordOption};
|
||||
use crate::schema::{Field, Schema};
|
||||
use crate::schema::{FieldType, Term};
|
||||
use crate::tokenizer::TokenizerManager;
|
||||
@@ -319,7 +319,10 @@ impl QueryParser {
|
||||
))
|
||||
}
|
||||
}
|
||||
FieldType::HierarchicalFacet => Ok(vec![(0, Term::from_field_text(field, phrase))]),
|
||||
FieldType::HierarchicalFacet => {
|
||||
let facet = Facet::from_text(phrase);
|
||||
Ok(vec![(0, Term::from_field_text(field, facet.encoded_str()))])
|
||||
}
|
||||
FieldType::Bytes => {
|
||||
let field_name = self.schema.get_field_name(field).to_string();
|
||||
Err(QueryParserError::FieldNotIndexed(field_name))
|
||||
@@ -554,6 +557,7 @@ mod test {
|
||||
schema_builder.add_text_field("with_stop_words", text_options);
|
||||
schema_builder.add_date_field("date", INDEXED);
|
||||
schema_builder.add_f64_field("float", INDEXED);
|
||||
schema_builder.add_facet_field("facet");
|
||||
let schema = schema_builder.build();
|
||||
let default_fields = vec![title, text];
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
@@ -588,9 +592,13 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_parse_query_simple() {
|
||||
pub fn test_parse_query_facet() {
|
||||
let query_parser = make_query_parser();
|
||||
assert!(query_parser.parse_query("toto").is_ok());
|
||||
let query = query_parser.parse_query("facet:/root/branch/leaf").unwrap();
|
||||
assert_eq!(
|
||||
format!("{:?}", query),
|
||||
"TermQuery(Term(field=11,bytes=[114, 111, 111, 116, 0, 98, 114, 97, 110, 99, 104, 0, 108, 101, 97, 102]))"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -166,7 +166,7 @@ impl SchemaBuilder {
|
||||
}
|
||||
|
||||
/// Adds a field entry to the schema in build.
|
||||
fn add_field(&mut self, field_entry: FieldEntry) -> Field {
|
||||
pub fn add_field(&mut self, field_entry: FieldEntry) -> Field {
|
||||
let field = Field::from_field_id(self.fields.len() as u32);
|
||||
let field_name = field_entry.name().to_string();
|
||||
self.fields.push(field_entry);
|
||||
@@ -401,6 +401,7 @@ pub enum DocParsingError {
|
||||
mod tests {
|
||||
|
||||
use crate::schema::field_type::ValueParsingError;
|
||||
use crate::schema::int_options::Cardinality::SingleValue;
|
||||
use crate::schema::schema::DocParsingError::NotJSON;
|
||||
use crate::schema::*;
|
||||
use matches::{assert_matches, matches};
|
||||
@@ -715,4 +716,94 @@ mod tests {
|
||||
assert_matches!(json_err, Err(NotJSON(_)));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_schema_add_field() {
|
||||
let mut schema_builder = SchemaBuilder::default();
|
||||
let id_options = TextOptions::default().set_stored().set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
.set_tokenizer("raw")
|
||||
.set_index_option(IndexRecordOption::Basic),
|
||||
);
|
||||
let timestamp_options = IntOptions::default()
|
||||
.set_stored()
|
||||
.set_indexed()
|
||||
.set_fast(SingleValue);
|
||||
schema_builder.add_text_field("_id", id_options);
|
||||
schema_builder.add_date_field("_timestamp", timestamp_options);
|
||||
|
||||
let schema_content = r#"[
|
||||
{
|
||||
"name": "text",
|
||||
"type": "text",
|
||||
"options": {
|
||||
"indexing": {
|
||||
"record": "position",
|
||||
"tokenizer": "default"
|
||||
},
|
||||
"stored": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "popularity",
|
||||
"type": "i64",
|
||||
"options": {
|
||||
"indexed": false,
|
||||
"fast": "single",
|
||||
"stored": true
|
||||
}
|
||||
}
|
||||
]"#;
|
||||
let tmp_schema: Schema =
|
||||
serde_json::from_str(&schema_content).expect("error while reading json");
|
||||
for (_field, field_entry) in tmp_schema.fields() {
|
||||
schema_builder.add_field(field_entry.clone());
|
||||
}
|
||||
|
||||
let schema = schema_builder.build();
|
||||
let schema_json = serde_json::to_string_pretty(&schema).unwrap();
|
||||
let expected = r#"[
|
||||
{
|
||||
"name": "_id",
|
||||
"type": "text",
|
||||
"options": {
|
||||
"indexing": {
|
||||
"record": "basic",
|
||||
"tokenizer": "raw"
|
||||
},
|
||||
"stored": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "_timestamp",
|
||||
"type": "date",
|
||||
"options": {
|
||||
"indexed": true,
|
||||
"fast": "single",
|
||||
"stored": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "text",
|
||||
"type": "text",
|
||||
"options": {
|
||||
"indexing": {
|
||||
"record": "position",
|
||||
"tokenizer": "default"
|
||||
},
|
||||
"stored": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "popularity",
|
||||
"type": "i64",
|
||||
"options": {
|
||||
"indexed": false,
|
||||
"fast": "single",
|
||||
"stored": true
|
||||
}
|
||||
}
|
||||
]"#;
|
||||
assert_eq!(schema_json, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,8 +209,8 @@ impl From<f64> for Value {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DateTime> for Value {
|
||||
fn from(date_time: DateTime) -> Value {
|
||||
impl From<crate::DateTime> for Value {
|
||||
fn from(date_time: crate::DateTime) -> Value {
|
||||
Value::Date(date_time)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ use self::compression_snap::{compress, decompress};
|
||||
pub mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||
use crate::schema::Document;
|
||||
use crate::schema::FieldValue;
|
||||
use crate::schema::Schema;
|
||||
|
||||
@@ -36,7 +36,7 @@ pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use crate::core::Index;
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlySource};
|
||||
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, ReadOnlySource};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::schema::{Document, FieldType, Schema, TEXT};
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use fail;
|
||||
use std::path::Path;
|
||||
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
|
||||
use tantivy::directory::{
|
||||
Directory, ManagedDirectory, RAMDirectory, ReadOnlyDirectory, TerminatingWrite,
|
||||
};
|
||||
use tantivy::doc;
|
||||
use tantivy::schema::{Schema, TEXT};
|
||||
use tantivy::{Index, Term};
|
||||
|
||||
Reference in New Issue
Block a user