mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 18:12:55 +00:00
Compare commits
8 Commits
segment_wr
...
storewrite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee6c839ee6 | ||
|
|
b3f0ef0878 | ||
|
|
04304262ba | ||
|
|
920ced364a | ||
|
|
e0499118e2 | ||
|
|
50b5efae46 | ||
|
|
486b8fa9c5 | ||
|
|
b2baed9bdd |
@@ -25,8 +25,7 @@ snap = "1"
|
||||
atomicwrites = {version="0.2.2", optional=true}
|
||||
tempfile = "3.0"
|
||||
log = "0.4"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde = {version="1.0", features=["derive"]}
|
||||
serde_json = "1.0"
|
||||
num_cpus = "1.2"
|
||||
fs2={version="0.4", optional=true}
|
||||
|
||||
@@ -40,7 +40,7 @@ performance for different type of queries / collection.
|
||||
# Features
|
||||
|
||||
- Full-text search
|
||||
- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)) and [Japanese](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter))
|
||||
- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)), Japanese ([lindera](https://github.com/lindera-morphology/lindera-tantivy) and [tantivy-tokenizer-tiny-segmente](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter)) and Korean ([lindera](https://github.com/lindera-morphology/lindera-tantivy) + [lindera-ko-dic-builder](https://github.com/lindera-morphology/lindera-ko-dic-builder))
|
||||
- Fast (check out the :racehorse: :sparkles: [benchmark](https://tantivy-search.github.io/bench/) :sparkles: :racehorse:)
|
||||
- Tiny startup time (<10ms), perfect for command line tools
|
||||
- BM25 scoring (the same as Lucene)
|
||||
|
||||
@@ -9,11 +9,10 @@
|
||||
// - import tokenized text straight from json,
|
||||
// - perform a search on documents with pre-tokenized text
|
||||
|
||||
use tantivy::tokenizer::{PreTokenizedString, SimpleTokenizer, Token, Tokenizer};
|
||||
|
||||
use tantivy::collector::{Count, TopDocs};
|
||||
use tantivy::query::TermQuery;
|
||||
use tantivy::schema::*;
|
||||
use tantivy::tokenizer::{PreTokenizedString, SimpleTokenizer, Token, Tokenizer};
|
||||
use tantivy::{doc, Index, ReloadPolicy};
|
||||
use tempfile::TempDir;
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use super::segment::create_segment;
|
||||
use super::segment::Segment;
|
||||
use crate::core::Executor;
|
||||
use crate::core::IndexMeta;
|
||||
@@ -337,7 +336,7 @@ impl Index {
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
create_segment(self.clone(), segment_meta)
|
||||
Segment::for_index(self.clone(), segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use census::{Inventory, TrackedObject};
|
||||
use serde;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
|
||||
@@ -24,15 +24,12 @@ impl fmt::Debug for Segment {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
/// #[doc(hidden)]
|
||||
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment { index, meta }
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment { index, meta }
|
||||
}
|
||||
|
||||
/// Returns the index the segment belongs to.
|
||||
pub fn index(&self) -> &Index {
|
||||
&self.index
|
||||
|
||||
@@ -4,6 +4,7 @@ use uuid::Uuid;
|
||||
|
||||
#[cfg(test)]
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::error::Error;
|
||||
use std::str::FromStr;
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::directory::WatchHandle;
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use atomicwrites;
|
||||
use memmap::Mmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
use std::fmt;
|
||||
|
||||
@@ -13,6 +13,7 @@ mod footer;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod read_only_source;
|
||||
mod spilling_writer;
|
||||
mod watch_event_router;
|
||||
|
||||
/// Errors specific to the directory module.
|
||||
@@ -22,6 +23,7 @@ pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub(crate) use self::spilling_writer::SpillingWriter;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
@@ -79,10 +81,16 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl TerminatingWrite for Vec<u8> {
|
||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -144,6 +144,22 @@ impl RAMDirectory {
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
/// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`.
|
||||
///
|
||||
/// Files are all written using the `Directory::write` meaning, even if they were
|
||||
/// written using the `atomic_write` api.
|
||||
///
|
||||
/// If an error is encounterred, files may be persisted partially.
|
||||
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
|
||||
let wlock = self.fs.write().unwrap();
|
||||
for (path, source) in wlock.fs.iter() {
|
||||
let mut dest_wrt = dest.open_write(path)?;
|
||||
dest_wrt.write_all(source.as_slice())?;
|
||||
dest_wrt.terminate()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
@@ -204,3 +220,28 @@ impl Directory for RAMDirectory {
|
||||
Ok(self.fs.write().unwrap().watch(watch_callback))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::RAMDirectory;
|
||||
use crate::Directory;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
#[test]
|
||||
fn test_persist() {
|
||||
let msg_atomic: &'static [u8] = b"atomic is the way";
|
||||
let msg_seq: &'static [u8] = b"sequential is the way";
|
||||
let path_atomic: &'static Path = Path::new("atomic");
|
||||
let path_seq: &'static Path = Path::new("seq");
|
||||
let mut directory = RAMDirectory::create();
|
||||
assert!(directory.atomic_write(path_atomic, msg_atomic).is_ok());
|
||||
let mut wrt = directory.open_write(path_seq).unwrap();
|
||||
assert!(wrt.write_all(msg_seq).is_ok());
|
||||
assert!(wrt.flush().is_ok());
|
||||
let mut directory_copy = RAMDirectory::create();
|
||||
assert!(directory.persist(&mut directory_copy).is_ok());
|
||||
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
||||
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
||||
}
|
||||
}
|
||||
|
||||
180
src/directory/spilling_writer.rs
Normal file
180
src/directory/spilling_writer.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
use crate::directory::{WritePtr, TerminatingWrite};
|
||||
use std::io::{self, Write};
|
||||
|
||||
enum SpillingState {
|
||||
Buffer {
|
||||
buffer: Vec<u8>,
|
||||
capacity: usize,
|
||||
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||
},
|
||||
Spilled(WritePtr),
|
||||
}
|
||||
|
||||
impl SpillingState {
|
||||
|
||||
fn new(
|
||||
limit: usize,
|
||||
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||
) -> SpillingState {
|
||||
SpillingState::Buffer {
|
||||
buffer: Vec::with_capacity(limit),
|
||||
capacity: limit,
|
||||
write_factory,
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve(self, extra_capacity: usize) -> io::Result<SpillingState> {
|
||||
match self {
|
||||
SpillingState::Buffer {
|
||||
buffer,
|
||||
capacity,
|
||||
write_factory,
|
||||
} => {
|
||||
if capacity >= extra_capacity {
|
||||
Ok(SpillingState::Buffer {
|
||||
buffer,
|
||||
capacity: capacity - extra_capacity,
|
||||
write_factory,
|
||||
})
|
||||
} else {
|
||||
let mut wrt = write_factory()?;
|
||||
wrt.write_all(&buffer[..])?;
|
||||
Ok(SpillingState::Spilled(wrt))
|
||||
}
|
||||
}
|
||||
SpillingState::Spilled(wrt) => Ok(SpillingState::Spilled(wrt)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SpillingWriter {
|
||||
state: Option<SpillingState>,
|
||||
}
|
||||
|
||||
impl SpillingWriter {
|
||||
pub fn new(
|
||||
limit: usize,
|
||||
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||
) -> SpillingWriter {
|
||||
let state = SpillingState::new(limit, write_factory);
|
||||
SpillingWriter {
|
||||
state: Some(state)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flush_and_finalize(self) -> io::Result<()> {
|
||||
if let SpillingState::Buffer {
|
||||
buffer,
|
||||
write_factory,
|
||||
..
|
||||
} = self.state.expect("State cannot be none") {
|
||||
let mut wrt = write_factory()?;
|
||||
wrt.write_all(&buffer[..])?;
|
||||
wrt.flush()?;
|
||||
wrt.terminate()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finalize(self) -> io::Result<SpillingResult> {
|
||||
match self.state.expect("state cannot be None") {
|
||||
SpillingState::Spilled(mut wrt) => {
|
||||
wrt.flush()?;
|
||||
Ok(SpillingResult::Spilled)
|
||||
}
|
||||
SpillingState::Buffer { buffer, .. } => Ok(SpillingResult::Buffer(buffer)),
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SpillingResult {
|
||||
Spilled,
|
||||
Buffer(Vec<u8>),
|
||||
}
|
||||
|
||||
impl io::Write for SpillingWriter {
|
||||
fn write(&mut self, payload: &[u8]) -> io::Result<usize> {
|
||||
self.write_all(payload)?;
|
||||
Ok(payload.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
if let Some(SpillingState::Spilled(wrt)) = &mut self.state {
|
||||
wrt.flush()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_all(&mut self, payload: &[u8]) -> io::Result<()> {
|
||||
let state_opt: Option<io::Result<SpillingState>> = self.state
|
||||
.take()
|
||||
.map(|mut state| {
|
||||
state = state.reserve(payload.len())?;
|
||||
match &mut state {
|
||||
SpillingState::Buffer { buffer, .. } => {
|
||||
buffer.extend_from_slice(payload);
|
||||
}
|
||||
SpillingState::Spilled(wrt) => {
|
||||
wrt.write_all(payload)?;
|
||||
}
|
||||
}
|
||||
Ok(state)
|
||||
});
|
||||
self.state = state_opt.transpose()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::SpillingWriter;
|
||||
use crate::directory::spilling_writer::SpillingResult;
|
||||
use crate::directory::RAMDirectory;
|
||||
use crate::Directory;
|
||||
use std::io::{self, Write};
|
||||
use std::path::Path;
|
||||
|
||||
#[test]
|
||||
fn test_no_spilling() {
|
||||
let ram_directory = RAMDirectory::create();
|
||||
let mut ram_directory_clone = ram_directory.clone();
|
||||
let path = Path::new("test");
|
||||
let write_factory = Box::new(move || {
|
||||
ram_directory_clone
|
||||
.open_write(path)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
});
|
||||
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
|
||||
assert!(spilling_wrt.write_all(b"abcd").is_ok());
|
||||
if let SpillingResult::Buffer(buf) = spilling_wrt.finalize().unwrap() {
|
||||
assert_eq!(buf, b"abcd")
|
||||
} else {
|
||||
panic!("spill writer should not have spilled");
|
||||
}
|
||||
assert!(!ram_directory.exists(path));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_spilling() {
|
||||
let ram_directory = RAMDirectory::create();
|
||||
let mut ram_directory_clone = ram_directory.clone();
|
||||
let path = Path::new("test");
|
||||
let write_factory = Box::new(move || {
|
||||
ram_directory_clone
|
||||
.open_write(path)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
});
|
||||
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
|
||||
assert!(spilling_wrt.write_all(b"abcd").is_ok());
|
||||
assert!(spilling_wrt.write_all(b"efghijklmnop").is_ok());
|
||||
if let SpillingResult::Spilled = spilling_wrt.finalize().unwrap() {
|
||||
} else {
|
||||
panic!("spill writer should have spilled");
|
||||
}
|
||||
assert_eq!(
|
||||
ram_directory.atomic_read(path).unwrap(),
|
||||
b"abcdefghijklmnop"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -155,6 +155,8 @@ pub(crate) fn advance_deletes(
|
||||
None => BitSet::with_max_value(max_doc),
|
||||
};
|
||||
|
||||
let num_deleted_docs_before = segment.meta().num_deleted_docs();
|
||||
|
||||
compute_deleted_bitset(
|
||||
&mut delete_bitset,
|
||||
&segment_reader,
|
||||
@@ -164,6 +166,8 @@ pub(crate) fn advance_deletes(
|
||||
)?;
|
||||
|
||||
// TODO optimize
|
||||
// It should be possible to do something smarter by manipulation bitsets directly
|
||||
// to compute this union.
|
||||
if let Some(seg_delete_bitset) = segment_reader.delete_bitset() {
|
||||
for doc in 0u32..max_doc {
|
||||
if seg_delete_bitset.is_deleted(doc) {
|
||||
@@ -172,8 +176,9 @@ pub(crate) fn advance_deletes(
|
||||
}
|
||||
}
|
||||
|
||||
let num_deleted_docs = delete_bitset.len();
|
||||
if num_deleted_docs > 0 {
|
||||
let num_deleted_docs: u32 = delete_bitset.len() as u32;
|
||||
if num_deleted_docs > num_deleted_docs_before {
|
||||
// There are new deletes. We need to write a new delete file.
|
||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
|
||||
@@ -803,6 +808,46 @@ mod tests {
|
||||
assert_eq!(batch_opstamp1, 2u64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_no_need_to_rewrite_delete_file_if_no_new_deletes() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(text_field => "hello1"));
|
||||
index_writer.add_document(doc!(text_field => "hello2"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 0);
|
||||
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "hello1"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
|
||||
assert!(reader.reload().is_ok());
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 1);
|
||||
|
||||
let previous_delete_opstamp = index.load_metas().unwrap().segments[0].delete_opstamp();
|
||||
|
||||
// All docs containing hello1 have been already removed.
|
||||
// We should not update the delete meta.
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "hello1"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
|
||||
assert!(reader.reload().is_ok());
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 1);
|
||||
|
||||
let after_delete_opstamp = index.load_metas().unwrap().segments[0].delete_opstamp();
|
||||
assert_eq!(after_delete_opstamp, previous_delete_opstamp);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ordered_batched_operations() {
|
||||
// * one delete for `doc!(field=>"a")`
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::common::MAX_DOC_LIMIT;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::docset::DocSet;
|
||||
use crate::fastfield::BytesFastFieldReader;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
@@ -661,7 +662,8 @@ impl IndexMerger {
|
||||
Ok(term_ordinal_mappings)
|
||||
}
|
||||
|
||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> {
|
||||
pub fn write_storable_fields(&self, store_wrt: WritePtr) -> crate::Result<()> {
|
||||
let mut store_writer = StoreWriter::new(store_wrt);
|
||||
for reader in &self.readers {
|
||||
let store_reader = reader.get_store_reader();
|
||||
if reader.num_deleted_docs() > 0 {
|
||||
@@ -673,6 +675,7 @@ impl IndexMerger {
|
||||
store_writer.stack(&store_reader)?;
|
||||
}
|
||||
}
|
||||
store_writer.close()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -682,7 +685,6 @@ impl SerializableSegment for IndexMerger {
|
||||
let term_ord_mappings = self.write_postings(serializer.get_postings_serializer())?;
|
||||
self.write_fieldnorms(serializer.get_fieldnorms_serializer())?;
|
||||
self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?;
|
||||
self.write_storable_fields(serializer.get_store_writer())?;
|
||||
serializer.close()?;
|
||||
Ok(self.max_doc)
|
||||
}
|
||||
|
||||
@@ -3,12 +3,10 @@ use crate::core::SegmentComponent;
|
||||
use crate::fastfield::FastFieldSerializer;
|
||||
use crate::fieldnorm::FieldNormsSerializer;
|
||||
use crate::postings::InvertedIndexSerializer;
|
||||
use crate::store::StoreWriter;
|
||||
|
||||
/// Segment serializer is in charge of laying out on disk
|
||||
/// the data accumulated and sorted by the `SegmentWriter`.
|
||||
pub struct SegmentSerializer {
|
||||
store_writer: StoreWriter,
|
||||
fast_field_serializer: FastFieldSerializer,
|
||||
fieldnorms_serializer: FieldNormsSerializer,
|
||||
postings_serializer: InvertedIndexSerializer,
|
||||
@@ -17,8 +15,6 @@ pub struct SegmentSerializer {
|
||||
impl SegmentSerializer {
|
||||
/// Creates a new `SegmentSerializer`.
|
||||
pub fn for_segment(segment: &mut Segment) -> crate::Result<SegmentSerializer> {
|
||||
let store_write = segment.open_write(SegmentComponent::STORE)?;
|
||||
|
||||
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
|
||||
|
||||
@@ -27,7 +23,6 @@ impl SegmentSerializer {
|
||||
|
||||
let postings_serializer = InvertedIndexSerializer::open(segment)?;
|
||||
Ok(SegmentSerializer {
|
||||
store_writer: StoreWriter::new(store_write),
|
||||
fast_field_serializer,
|
||||
fieldnorms_serializer,
|
||||
postings_serializer,
|
||||
@@ -49,16 +44,10 @@ impl SegmentSerializer {
|
||||
&mut self.fieldnorms_serializer
|
||||
}
|
||||
|
||||
/// Accessor to the `StoreWriter`.
|
||||
pub fn get_store_writer(&mut self) -> &mut StoreWriter {
|
||||
&mut self.store_writer
|
||||
}
|
||||
|
||||
/// Finalize the segment serialization.
|
||||
pub fn close(self) -> crate::Result<()> {
|
||||
self.fast_field_serializer.close()?;
|
||||
self.postings_serializer.close()?;
|
||||
self.store_writer.close()?;
|
||||
self.fieldnorms_serializer.close()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::indexer::SegmentSerializer;
|
||||
use crate::indexer::{DefaultMergePolicy, MergePolicy};
|
||||
use crate::indexer::{MergeCandidate, MergeOperation};
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use crate::{Opstamp, SegmentComponent};
|
||||
use futures::channel::oneshot;
|
||||
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
||||
use futures::future::Future;
|
||||
@@ -134,8 +134,10 @@ fn merge(
|
||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?;
|
||||
merger.write_storable_fields(store_wrt)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
|
||||
|
||||
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
|
||||
|
||||
@@ -11,13 +11,15 @@ use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::schema::Value;
|
||||
use crate::schema::{Field, FieldEntry};
|
||||
use crate::store::StoreWriter;
|
||||
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::DocId;
|
||||
use crate::Opstamp;
|
||||
use crate::{DocId, SegmentComponent};
|
||||
use std::io;
|
||||
use std::str;
|
||||
use crate::directory::SpillingWriter;
|
||||
|
||||
/// Computes the initial size of the hash table.
|
||||
///
|
||||
@@ -43,11 +45,12 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
|
||||
pub struct SegmentWriter {
|
||||
max_doc: DocId,
|
||||
multifield_postings: MultiFieldPostingsWriter,
|
||||
segment_serializer: SegmentSerializer,
|
||||
segment: Segment,
|
||||
fast_field_writers: FastFieldsWriter,
|
||||
fieldnorms_writer: FieldNormsWriter,
|
||||
doc_opstamps: Vec<Opstamp>,
|
||||
tokenizers: Vec<Option<TextAnalyzer>>,
|
||||
store_writer: StoreWriter<SpillingWriter>,
|
||||
}
|
||||
|
||||
impl SegmentWriter {
|
||||
@@ -62,11 +65,10 @@ impl SegmentWriter {
|
||||
/// - schema
|
||||
pub fn for_segment(
|
||||
memory_budget: usize,
|
||||
mut segment: Segment,
|
||||
segment: Segment,
|
||||
schema: &Schema,
|
||||
) -> crate::Result<SegmentWriter> {
|
||||
let table_num_bits = initial_table_size(memory_budget)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
|
||||
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
||||
let tokenizers = schema
|
||||
.fields()
|
||||
@@ -82,14 +84,22 @@ impl SegmentWriter {
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
let mut segment_clone = segment.clone();
|
||||
let spilling_wrt = SpillingWriter::new(1_000, Box::new(move || {
|
||||
segment_clone
|
||||
.open_write(SegmentComponent::STORE)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
}));
|
||||
let store_writer = StoreWriter::new(spilling_wrt);
|
||||
Ok(SegmentWriter {
|
||||
max_doc: 0,
|
||||
multifield_postings,
|
||||
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
|
||||
segment_serializer,
|
||||
segment,
|
||||
fast_field_writers: FastFieldsWriter::from_schema(schema),
|
||||
doc_opstamps: Vec::with_capacity(1_000),
|
||||
tokenizers,
|
||||
store_writer,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -99,11 +109,14 @@ impl SegmentWriter {
|
||||
/// be used afterwards.
|
||||
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
|
||||
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
|
||||
let spilling_wrt = self.store_writer.close()?;
|
||||
spilling_wrt.flush_and_finalize()?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut self.segment)?;
|
||||
write(
|
||||
&self.multifield_postings,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
self.segment_serializer,
|
||||
segment_serializer,
|
||||
)?;
|
||||
Ok(self.doc_opstamps)
|
||||
}
|
||||
@@ -246,8 +259,7 @@ impl SegmentWriter {
|
||||
}
|
||||
doc.filter_fields(|field| schema.get_field_entry(field).is_stored());
|
||||
doc.prepare_for_store();
|
||||
let doc_writer = self.segment_serializer.get_store_writer();
|
||||
doc_writer.store(&doc)?;
|
||||
self.store_writer.store(&doc)?;
|
||||
self.max_doc += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -98,9 +98,6 @@
|
||||
//! [literate programming](https://tantivy-search.github.io/examples/basic_search.html) /
|
||||
//! [source code](https://github.com/tantivy-search/tantivy/blob/master/examples/basic_search.rs))
|
||||
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
#[cfg_attr(test, macro_use)]
|
||||
extern crate serde_json;
|
||||
|
||||
@@ -173,6 +170,7 @@ pub use crate::schema::{Document, Term};
|
||||
use std::fmt;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Index format version.
|
||||
const INDEX_FORMAT_VERSION: u32 = 1;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{DocId, TantivyError};
|
||||
use serde::Serialize;
|
||||
|
||||
pub(crate) fn does_not_match(doc: DocId) -> TantivyError {
|
||||
TantivyError::InvalidArgument(format!("Document #({}) does not match", doc))
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::common::VInt;
|
||||
use crate::tokenizer::PreTokenizedString;
|
||||
use crate::DateTime;
|
||||
use itertools::Itertools;
|
||||
use serde;
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
/// Tantivy's Document is the object that can
|
||||
@@ -16,7 +17,7 @@ use std::io::{self, Read, Write};
|
||||
|
||||
/// Documents are really just a list of couple `(field, value)`.
|
||||
/// In this list, one field may appear more than once.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
|
||||
pub struct Document {
|
||||
field_values: Vec<FieldValue>,
|
||||
}
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
use crate::common::BinarySerializable;
|
||||
use serde;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
|
||||
/// `Field` is represented by an unsigned 32-bit integer type
|
||||
/// The schema holds the mapping between field names and `Field` objects.
|
||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, serde::Serialize, serde::Deserialize,
|
||||
)]
|
||||
pub struct Field(u32);
|
||||
|
||||
impl Field {
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::schema::Field;
|
||||
use crate::schema::Value;
|
||||
use serde;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
|
||||
/// `FieldValue` holds together a `Field` and its `Value`.
|
||||
#[derive(Debug, Clone, Ord, PartialEq, Eq, PartialOrd, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Ord, PartialEq, Eq, PartialOrd, serde::Serialize, serde::Deserialize)]
|
||||
pub struct FieldValue {
|
||||
field: Field,
|
||||
value: Value,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// `IndexRecordOption` describes an amount information associated
|
||||
/// to a given indexed field.
|
||||
///
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::BitOr;
|
||||
|
||||
/// Express whether a field is single-value or multi-valued.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::schema::Value;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
/// Internal representation of a document used for JSON
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::schema::flags::SchemaFlagList;
|
||||
use crate::schema::flags::StoredFlag;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Cow;
|
||||
use std::ops::BitOr;
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ under-count actual resultant space usage by up to 4095 bytes per file.
|
||||
|
||||
use crate::schema::Field;
|
||||
use crate::SegmentComponent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Indicates space usage in bytes
|
||||
|
||||
@@ -3,8 +3,6 @@ use super::skiplist::SkipListBuilder;
|
||||
use super::StoreReader;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Document;
|
||||
use crate::DocId;
|
||||
use std::io::{self, Write};
|
||||
@@ -19,20 +17,20 @@ const BLOCK_SIZE: usize = 16_384;
|
||||
///
|
||||
/// The skip list index on the other hand, is built in memory.
|
||||
///
|
||||
pub struct StoreWriter {
|
||||
pub struct StoreWriter<W: io::Write> {
|
||||
doc: DocId,
|
||||
offset_index_writer: SkipListBuilder<u64>,
|
||||
writer: CountingWriter<WritePtr>,
|
||||
writer: CountingWriter<W>,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
current_block: Vec<u8>,
|
||||
}
|
||||
|
||||
impl StoreWriter {
|
||||
impl<W: io::Write> StoreWriter<W> {
|
||||
/// Create a store writer.
|
||||
///
|
||||
/// The store writer will writes blocks on disc as
|
||||
/// document are added.
|
||||
pub fn new(writer: WritePtr) -> StoreWriter {
|
||||
pub fn new(writer: W) -> StoreWriter<W> {
|
||||
StoreWriter {
|
||||
doc: 0,
|
||||
offset_index_writer: SkipListBuilder::new(4),
|
||||
@@ -102,7 +100,7 @@ impl StoreWriter {
|
||||
///
|
||||
/// Compress the last unfinished block if any,
|
||||
/// and serializes the skip list index on disc.
|
||||
pub fn close(mut self) -> io::Result<()> {
|
||||
pub fn close(mut self) -> io::Result<W> {
|
||||
if !self.current_block.is_empty() {
|
||||
self.write_and_compress_block()?;
|
||||
}
|
||||
@@ -110,6 +108,9 @@ impl StoreWriter {
|
||||
self.offset_index_writer.write(&mut self.writer)?;
|
||||
header_offset.serialize(&mut self.writer)?;
|
||||
self.doc.serialize(&mut self.writer)?;
|
||||
self.writer.terminate()
|
||||
self.writer.flush()?;
|
||||
let (wrt, _) = self.writer.finish()?;
|
||||
Ok(wrt)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::{Token, TokenFilter, TokenStream};
|
||||
use crate::tokenizer::BoxTokenStream;
|
||||
use rust_stemmers::{self, Algorithm};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Available stemmer languages.
|
||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone)]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::tokenizer::{BoxTokenStream, Token, TokenStream, TokenStreamChain};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
/// Struct representing pre-tokenized text
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::tokenizer::TokenStreamChain;
|
||||
use serde::{Deserialize, Serialize};
|
||||
/// The tokenizer module contains all of the tools used to process
|
||||
/// text in `tantivy`.
|
||||
use std::borrow::{Borrow, BorrowMut};
|
||||
|
||||
Reference in New Issue
Block a user