mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-26 21:20:40 +00:00
issue/43 deletes
merge not working only updating uncommitted
This commit is contained in:
@@ -208,8 +208,8 @@ impl Index {
|
||||
/// Return a segment object given a `segment_id`
|
||||
///
|
||||
/// The segment may or may not exist.
|
||||
pub fn segment(&self, segment_id: SegmentId, commit_opstamp: u64) -> Segment {
|
||||
create_segment(self.clone(), segment_id, commit_opstamp)
|
||||
pub fn segment(&self, segment_id: SegmentId, opstamp: u64) -> Segment {
|
||||
create_segment(self.clone(), segment_id, opstamp)
|
||||
}
|
||||
|
||||
/// Return a reference to the index directory.
|
||||
|
||||
@@ -16,7 +16,7 @@ use directory::error::{FileError, OpenWriteError};
|
||||
pub struct Segment {
|
||||
index: Index,
|
||||
segment_id: SegmentId,
|
||||
commit_opstamp: u64,
|
||||
opstamp: u64,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Segment {
|
||||
@@ -28,11 +28,11 @@ impl fmt::Debug for Segment {
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64) -> Segment {
|
||||
pub fn create_segment(index: Index, segment_id: SegmentId, opstamp: u64) -> Segment {
|
||||
Segment {
|
||||
index: index,
|
||||
segment_id: segment_id,
|
||||
commit_opstamp: commit_opstamp,
|
||||
opstamp: opstamp,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ impl Segment {
|
||||
self.index.schema()
|
||||
}
|
||||
|
||||
pub fn commit_opstamp(&self) -> u64 {
|
||||
self.commit_opstamp
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
self.opstamp
|
||||
}
|
||||
|
||||
/// Returns the segment's id.
|
||||
@@ -52,12 +52,21 @@ impl Segment {
|
||||
self.segment_id
|
||||
}
|
||||
|
||||
pub fn with_opstamp(&self, opstamp: u64) -> Segment {
|
||||
Segment {
|
||||
index: self.index.clone(),
|
||||
segment_id: self.segment_id.clone(),
|
||||
opstamp: opstamp,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the relative path of a component of our segment.
|
||||
///
|
||||
/// It just joins the segment id with the extension
|
||||
/// associated to a segment component.
|
||||
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
||||
self.segment_id.relative_path(component)
|
||||
let path_suffix = component.path_suffix(self.opstamp);
|
||||
PathBuf::from(self.segment_id.uuid_string() + &*path_suffix)
|
||||
}
|
||||
|
||||
/// Open one of the component file for read.
|
||||
|
||||
@@ -7,14 +7,12 @@ pub enum SegmentComponent {
|
||||
FIELDNORMS,
|
||||
TERMS,
|
||||
STORE,
|
||||
DELETE(u64), //< The argument here is an opstamp.
|
||||
// All of the deletes with an opstamp smaller or equal
|
||||
// to this opstamp have been taken in account.
|
||||
DELETE
|
||||
}
|
||||
|
||||
impl SegmentComponent {
|
||||
|
||||
pub fn path_suffix(&self)-> String {
|
||||
pub fn path_suffix(&self, opstamp: u64)-> String {
|
||||
match *self {
|
||||
SegmentComponent::POSITIONS => ".pos".to_string(),
|
||||
SegmentComponent::INFO => ".info".to_string(),
|
||||
@@ -23,9 +21,7 @@ impl SegmentComponent {
|
||||
SegmentComponent::STORE => ".store".to_string(),
|
||||
SegmentComponent::FASTFIELDS => ".fast".to_string(),
|
||||
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
|
||||
SegmentComponent::DELETE(opstamp) => {
|
||||
format!(".{}.del", opstamp)
|
||||
}
|
||||
SegmentComponent::DELETE => {format!(".{}.del", opstamp)},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,11 +48,6 @@ impl SegmentId {
|
||||
pub fn uuid_string(&self,) -> String {
|
||||
self.0.simple().to_string()
|
||||
}
|
||||
|
||||
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
||||
let filename = self.uuid_string() + &*component.path_suffix();
|
||||
PathBuf::from(filename)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for SegmentId {
|
||||
|
||||
@@ -147,7 +147,7 @@ impl SegmentReader {
|
||||
.unwrap_or_else(|_| ReadOnlySource::empty());
|
||||
|
||||
// TODO 0u64
|
||||
let delete_data_res = segment.open_read(SegmentComponent::DELETE(segment.commit_opstamp()));
|
||||
let delete_data_res = segment.open_read(SegmentComponent::DELETE);
|
||||
let delete_bitset;
|
||||
if let Err(FileError::FileDoesNotExist(_)) = delete_data_res {
|
||||
delete_bitset = DeleteBitSet::empty();
|
||||
@@ -262,6 +262,10 @@ impl SegmentReader {
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.segment_id
|
||||
}
|
||||
|
||||
pub fn is_deleted(&self, doc: DocId) -> bool {
|
||||
self.delete_bitset.is_deleted(doc)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -54,9 +54,6 @@ mod tests {
|
||||
#[test]
|
||||
pub fn test_fastfield() {
|
||||
let test_fastfield = U32FastFieldReader::from(vec!(100,200,300));
|
||||
println!("{}", test_fastfield.get(0));
|
||||
println!("{}", test_fastfield.get(1));
|
||||
println!("{}", test_fastfield.get(2));
|
||||
assert_eq!(test_fastfield.get(0), 100);
|
||||
assert_eq!(test_fastfield.get(1), 200);
|
||||
assert_eq!(test_fastfield.get(2), 300);
|
||||
|
||||
@@ -97,7 +97,7 @@ impl DeleteQueueCursor {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
self.consume();
|
||||
self.next();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
@@ -128,15 +128,17 @@ impl DeleteQueueCursor {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Iterator for DeleteQueueCursor {
|
||||
type Item = DeleteOperation;
|
||||
|
||||
/// Returns a delete operation if an operation is available,
|
||||
/// None if the queue is empty.
|
||||
///
|
||||
/// (We are voluntarily not using the `Iterator` trait
|
||||
/// as a call to `consume` may return None once, and return
|
||||
/// `Some(...)` ulteriorily. While this is officially
|
||||
/// compatible with the `Iterator` specification, we judge
|
||||
/// this confusing.)
|
||||
pub fn consume(&mut self) -> Option<DeleteOperation> {
|
||||
/// This iterator may return None once, and return
|
||||
/// `Some(...)` ulteriorily.
|
||||
fn next(&mut self) -> Option<DeleteOperation> {
|
||||
let delete_position = self.peek();
|
||||
if delete_position.is_some() {
|
||||
self.pos += 1;
|
||||
@@ -197,7 +199,7 @@ mod tests {
|
||||
let mut delete_cursor_3 = delete_queue.cursor();
|
||||
let mut delete_cursor_3_b = delete_cursor_3.clone();
|
||||
|
||||
assert!(delete_cursor_3.consume().is_none());
|
||||
assert!(delete_cursor_3.next().is_none());
|
||||
assert!(delete_cursor_3.peek().is_none());
|
||||
|
||||
delete_queue.push_op(make_op(3));
|
||||
@@ -206,24 +208,24 @@ mod tests {
|
||||
assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3)));
|
||||
let mut delete_cursor_3_c = delete_cursor_3_b.clone();
|
||||
|
||||
assert_eq!(delete_cursor_3_b.consume(), Some(make_op(3)));
|
||||
assert_eq!(delete_cursor_3_b.next(), Some(make_op(3)));
|
||||
let mut delete_cursor_4 = delete_cursor_3_b.clone();
|
||||
|
||||
assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4)));
|
||||
assert_eq!(delete_cursor_3_b.consume(), Some(make_op(4)));
|
||||
assert_eq!(delete_cursor_3_b.next(), Some(make_op(4)));
|
||||
|
||||
assert_eq!(delete_cursor_3_c.consume(), Some(make_op(3)));
|
||||
assert_eq!(delete_cursor_3_c.next(), Some(make_op(3)));
|
||||
|
||||
assert!(delete_cursor_3_b.consume().is_none());
|
||||
assert_eq!(delete_cursor_3_c.consume(), Some(make_op(4)));
|
||||
assert!(delete_cursor_3_c.consume().is_none());
|
||||
assert!(delete_cursor_3_b.next().is_none());
|
||||
assert_eq!(delete_cursor_3_c.next(), Some(make_op(4)));
|
||||
assert!(delete_cursor_3_c.next().is_none());
|
||||
|
||||
assert_eq!(delete_cursor_3.peek(), Some(make_op(3)));
|
||||
assert_eq!(delete_cursor_3.consume(), Some(make_op(3)));
|
||||
assert!(delete_cursor_3_b.consume().is_none());
|
||||
assert_eq!(delete_cursor_3.next(), Some(make_op(3)));
|
||||
assert!(delete_cursor_3_b.next().is_none());
|
||||
|
||||
assert_eq!(delete_cursor_4.consume(), Some(make_op(4)));
|
||||
assert!(delete_cursor_4.consume().is_none());
|
||||
assert_eq!(delete_cursor_4.next(), Some(make_op(4)));
|
||||
assert!(delete_cursor_4.next().is_none());
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
use DocId;
|
||||
|
||||
pub trait DocumentReceiver {
|
||||
fn receive(&mut self, doc: DocId);
|
||||
}
|
||||
@@ -9,6 +9,11 @@ use indexer::SegmentEntry;
|
||||
use std::thread::JoinHandle;
|
||||
use indexer::MergePolicy;
|
||||
use indexer::SegmentWriter;
|
||||
use DocId;
|
||||
use bit_set::BitSet;
|
||||
use fastfield::delete::write_delete_bitset;
|
||||
use postings::SegmentPostingsOption;
|
||||
use postings::DocSet;
|
||||
use core::SegmentComponent;
|
||||
use super::directory_lock::DirectoryLock;
|
||||
use futures::Future;
|
||||
@@ -19,6 +24,7 @@ use std::thread;
|
||||
use futures::Canceled;
|
||||
use std::mem;
|
||||
use datastruct::stacker::Heap;
|
||||
use core::SegmentReader;
|
||||
use std::mem::swap;
|
||||
use chan;
|
||||
use core::SegmentMeta;
|
||||
@@ -84,6 +90,64 @@ impl !Send for IndexWriter {}
|
||||
impl !Sync for IndexWriter {}
|
||||
|
||||
|
||||
pub enum DocToOpstampMapping {
|
||||
WithMap(Vec<u64>),
|
||||
None
|
||||
}
|
||||
|
||||
impl DocToOpstampMapping {
|
||||
fn compute_doc_limit(&self, opstamp: u64) -> DocId {
|
||||
match *self {
|
||||
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
|
||||
match doc_opstamps.binary_search(&opstamp) {
|
||||
Ok(doc_id) => doc_id as DocId,
|
||||
Err(doc_id) => doc_id as DocId,
|
||||
}
|
||||
}
|
||||
DocToOpstampMapping::None => DocId::max_value(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// TODO
|
||||
/// work on SegmentMeta
|
||||
pub fn advance_deletes(
|
||||
segment: &Segment,
|
||||
delete_cursor: &mut DeleteQueueCursor,
|
||||
doc_opstamps: DocToOpstampMapping) -> Result<(u64, BitSet)> {
|
||||
let segment_reader = SegmentReader::open(segment.clone())?;
|
||||
let mut delete_bitset = BitSet::new();
|
||||
for doc in 0u32..segment_reader.max_doc() {
|
||||
if segment_reader.is_deleted(doc) {
|
||||
delete_bitset.insert(doc as usize);
|
||||
}
|
||||
}
|
||||
let mut has_changed = false;
|
||||
let mut last_opstamp = segment.opstamp();//segment
|
||||
for delete_op in delete_cursor {
|
||||
// A delete operation should only affect
|
||||
// document that were inserted after it.
|
||||
//
|
||||
// Limit doc helps identify the first document
|
||||
// that may be affected by the delete operation.
|
||||
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
|
||||
if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
|
||||
while docset.advance() {
|
||||
has_changed = true;
|
||||
let deleted_doc = docset.doc();
|
||||
if deleted_doc < limit_doc {
|
||||
has_changed = true;
|
||||
delete_bitset.insert(deleted_doc as usize);
|
||||
}
|
||||
}
|
||||
}
|
||||
last_opstamp = delete_op.opstamp;
|
||||
}
|
||||
Ok((last_opstamp, delete_bitset))
|
||||
}
|
||||
|
||||
fn index_documents(heap: &mut Heap,
|
||||
mut segment: Segment,
|
||||
schema: &Schema,
|
||||
@@ -106,32 +170,28 @@ fn index_documents(heap: &mut Heap,
|
||||
let num_docs = segment_writer.max_doc();
|
||||
assert!(num_docs > 0);
|
||||
|
||||
let deleted_docset_opt = segment_writer.compute_deleted_bitset(delete_cursor);
|
||||
|
||||
let last_opstamp = segment_writer.last_opstamp();
|
||||
|
||||
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
|
||||
|
||||
let num_deleted_docs;
|
||||
let (last_opstamp_after_deletes, deleted_docset) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?;
|
||||
|
||||
if let Some(deleted_docset) = deleted_docset_opt {
|
||||
let mut delete_write = segment.open_write(SegmentComponent::DELETE(last_opstamp))?;
|
||||
delete::write_delete_bitset(&deleted_docset, &mut delete_write)?;
|
||||
num_deleted_docs = deleted_docset.len();
|
||||
}
|
||||
else {
|
||||
num_deleted_docs = 0;
|
||||
{
|
||||
let mut delete_file = segment.with_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&deleted_docset, &mut delete_file)?;
|
||||
}
|
||||
let num_deleted_docs = deleted_docset.len() as DocId;
|
||||
|
||||
let segment_meta = SegmentMeta {
|
||||
segment_id: segment_id,
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: num_deleted_docs as u32,
|
||||
opstamp: last_opstamp,
|
||||
num_deleted_docs: num_deleted_docs,
|
||||
opstamp: last_opstamp_after_deletes,
|
||||
};
|
||||
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
|
||||
|
||||
try!(segment_writer.finalize());
|
||||
|
||||
|
||||
segment_updater
|
||||
.add_segment(generation, segment_entry)
|
||||
.wait()
|
||||
@@ -143,8 +203,6 @@ fn index_documents(heap: &mut Heap,
|
||||
impl IndexWriter {
|
||||
/// The index writer
|
||||
pub fn wait_merging_threads(mut self) -> Result<()> {
|
||||
|
||||
// let future = self.segment_updater.terminate();
|
||||
|
||||
// this will stop the indexing thread,
|
||||
// dropping the last reference to the segment_updater.
|
||||
@@ -165,9 +223,7 @@ impl IndexWriter {
|
||||
.wait_merging_thread()
|
||||
.map_err(|_|
|
||||
Error::ErrorInThread("Failed to join merging thread.".to_string())
|
||||
)?;
|
||||
// future.wait().unwrap(); // TODO do something with the result.
|
||||
Ok(())
|
||||
)
|
||||
}
|
||||
|
||||
/// Spawns a new worker thread for indexing.
|
||||
@@ -384,7 +440,6 @@ impl IndexWriter {
|
||||
Ok(self.committed_opstamp)
|
||||
}
|
||||
|
||||
|
||||
/// Commits all of the pending changes
|
||||
///
|
||||
/// A call to commit blocks.
|
||||
@@ -408,7 +463,7 @@ impl IndexWriter {
|
||||
let mut former_workers_join_handle = Vec::new();
|
||||
swap(&mut former_workers_join_handle,
|
||||
&mut self.workers_join_handle);
|
||||
|
||||
|
||||
for worker_handle in former_workers_join_handle {
|
||||
let indexing_worker_result = try!(worker_handle.join()
|
||||
.map_err(|e| Error::ErrorInThread(format!("{:?}", e))));
|
||||
@@ -416,6 +471,7 @@ impl IndexWriter {
|
||||
// add a new worker for the next generation.
|
||||
try!(self.add_indexing_worker());
|
||||
}
|
||||
|
||||
// here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
@@ -426,8 +482,8 @@ impl IndexWriter {
|
||||
|
||||
// This will move uncommitted segments to the state of
|
||||
// committed segments.
|
||||
|
||||
self.committed_opstamp = self.stamp();
|
||||
|
||||
let future = self.segment_updater.commit(self.committed_opstamp);
|
||||
|
||||
// wait for the segment update thread to have processed the info
|
||||
|
||||
@@ -10,7 +10,6 @@ mod segment_manager;
|
||||
pub mod delete_queue;
|
||||
pub mod segment_updater;
|
||||
mod directory_lock;
|
||||
pub mod document_receiver;
|
||||
pub mod operation;
|
||||
|
||||
pub use self::segment_register::SegmentEntry;
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
|
||||
struct SegmentRegisters {
|
||||
docstamp: u64,
|
||||
uncommitted: SegmentRegister,
|
||||
committed: SegmentRegister,
|
||||
}
|
||||
@@ -16,7 +15,6 @@ struct SegmentRegisters {
|
||||
impl Default for SegmentRegisters {
|
||||
fn default() -> SegmentRegisters {
|
||||
SegmentRegisters {
|
||||
docstamp: 0u64,
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::default()
|
||||
}
|
||||
@@ -57,12 +55,23 @@ impl SegmentManager {
|
||||
pub fn from_segments(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteQueueCursor) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new( SegmentRegisters {
|
||||
docstamp: 0u64, // TODO put the actual value
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn segment_entries(&self,) -> Vec<SegmentEntry> {
|
||||
let mut segment_entries = self.read()
|
||||
.uncommitted
|
||||
.segment_entries();
|
||||
segment_entries.extend(
|
||||
self.read()
|
||||
.committed
|
||||
.segment_entries()
|
||||
);
|
||||
segment_entries
|
||||
}
|
||||
|
||||
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
let registers = self.read();
|
||||
@@ -98,7 +107,6 @@ impl SegmentManager {
|
||||
for segment_entry in segment_entries {
|
||||
registers_lock.committed.add_segment_entry(segment_entry);
|
||||
}
|
||||
registers_lock.docstamp = docstamp;
|
||||
registers_lock.uncommitted.clear();
|
||||
}
|
||||
|
||||
@@ -151,7 +159,6 @@ impl Default for SegmentManager {
|
||||
fn default() -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new( SegmentRegisters {
|
||||
docstamp: 0u64, // TODO put the actual value
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::default(),
|
||||
}),
|
||||
|
||||
@@ -32,6 +32,10 @@ impl SegmentEntry {
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.meta.segment_id
|
||||
}
|
||||
|
||||
pub fn delete_cursor(&mut self) -> &mut DeleteQueueCursor {
|
||||
&mut self.delete_cursor
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &SegmentMeta {
|
||||
&self.meta
|
||||
|
||||
@@ -10,9 +10,11 @@ use std::mem;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::ops::DerefMut;
|
||||
use futures::{Future, future};
|
||||
use fastfield::delete::write_delete_bitset;
|
||||
use futures::oneshot;
|
||||
use futures::Canceled;
|
||||
use std::thread;
|
||||
use core::SegmentComponent;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::RwLock;
|
||||
use core::SerializableSegment;
|
||||
@@ -22,6 +24,7 @@ use std::borrow::BorrowMut;
|
||||
use indexer::SegmentSerializer;
|
||||
use indexer::SegmentEntry;
|
||||
use schema::Schema;
|
||||
use indexer::index_writer::{advance_deletes, DocToOpstampMapping};
|
||||
use directory::Directory;
|
||||
use std::thread::JoinHandle;
|
||||
use std::sync::Arc;
|
||||
@@ -171,8 +174,26 @@ impl SegmentUpdater {
|
||||
}
|
||||
}
|
||||
|
||||
fn purge_deletes(&self, target_opstamp: u64) -> Result<()> {
|
||||
let uncommitted = self.0.segment_manager.segment_entries();
|
||||
for mut segment_entry in uncommitted {
|
||||
let mut segment = self.0.index.segment(segment_entry.meta().segment_id, segment_entry.meta().opstamp);
|
||||
let (_, deleted_docset) = advance_deletes(
|
||||
&segment,
|
||||
segment_entry.delete_cursor(),
|
||||
DocToOpstampMapping::None).unwrap();
|
||||
{
|
||||
let mut delete_file = segment.with_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&deleted_docset, &mut delete_file)?;
|
||||
}
|
||||
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
|
||||
self.run_async(move |segment_updater| {
|
||||
segment_updater.purge_deletes(opstamp).expect("Failed purge deletes");
|
||||
segment_updater.0.segment_manager.commit(opstamp);
|
||||
let mut directory = segment_updater.0.index.directory().box_clone();
|
||||
save_metas(
|
||||
|
||||
@@ -17,58 +17,10 @@ use postings::SpecializedPostingsWriter;
|
||||
use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
|
||||
use indexer::segment_serializer::SegmentSerializer;
|
||||
use datastruct::stacker::Heap;
|
||||
use super::delete_queue::DeleteQueueCursor;
|
||||
use indexer::index_writer::MARGIN_IN_BYTES;
|
||||
use super::operation::AddOperation;
|
||||
use bit_set::BitSet;
|
||||
use indexer::document_receiver::DocumentReceiver;
|
||||
use core::SegmentReader;
|
||||
use postings::SegmentPostingsOption;
|
||||
use postings::DocSet;
|
||||
|
||||
|
||||
|
||||
fn update_deleted_bitset(
|
||||
segment_reader: &SegmentReader,
|
||||
bitset: &mut BitSet,
|
||||
delete_cursor: &mut DeleteQueueCursor,
|
||||
limit_opstamp_opt: Option<u64>) -> bool {
|
||||
let mut has_changed = false;
|
||||
let limit_opstamp = limit_opstamp_opt.unwrap_or(u64::max_value());
|
||||
loop {
|
||||
if let Some(delete_op) = delete_cursor.peek() {
|
||||
if delete_op.opstamp > limit_opstamp {
|
||||
break;
|
||||
}
|
||||
if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
|
||||
while docset.advance() {
|
||||
has_changed = true;
|
||||
let deleted_doc = docset.doc();
|
||||
bitset.insert(deleted_doc as usize);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
delete_cursor.consume();
|
||||
}
|
||||
has_changed
|
||||
}
|
||||
|
||||
struct DocumentDeleter<'a> {
|
||||
limit_doc_id: DocId,
|
||||
deleted_docs: &'a mut BitSet,
|
||||
}
|
||||
|
||||
impl<'a> DocumentReceiver for DocumentDeleter<'a> {
|
||||
fn receive(&mut self, doc: DocId) {
|
||||
if doc < self.limit_doc_id {
|
||||
self.deleted_docs.insert(doc as usize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `SegmentWriter` is in charge of creating segment index from a
|
||||
/// documents.
|
||||
///
|
||||
@@ -154,19 +106,18 @@ impl<'a> SegmentWriter<'a> {
|
||||
///
|
||||
/// Finalize consumes the `SegmentWriter`, so that it cannot
|
||||
/// be used afterwards.
|
||||
pub fn finalize(mut self) -> Result<()> {
|
||||
pub fn finalize(mut self) -> Result<Vec<u64>> {
|
||||
let segment_info = self.segment_info();
|
||||
for per_field_postings_writer in &mut self.per_field_postings_writers {
|
||||
per_field_postings_writer.close(self.heap);
|
||||
}
|
||||
try!(write(
|
||||
&self.per_field_postings_writers,
|
||||
write(&self.per_field_postings_writers,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
segment_info,
|
||||
self.segment_serializer,
|
||||
self.heap));
|
||||
Ok(())
|
||||
self.heap)?;
|
||||
Ok(self.doc_opstamps)
|
||||
}
|
||||
|
||||
/// Returns true iff the segment writer's buffer has reached capacity.
|
||||
@@ -180,14 +131,6 @@ impl<'a> SegmentWriter<'a> {
|
||||
self.heap.num_free_bytes() <= MARGIN_IN_BYTES
|
||||
}
|
||||
|
||||
fn compute_doc_limit(&self, opstamp: u64) -> DocId {
|
||||
let doc_id = match self.doc_opstamps.binary_search(&opstamp) {
|
||||
Ok(doc_id) => doc_id,
|
||||
Err(doc_id) => doc_id,
|
||||
};
|
||||
doc_id as DocId
|
||||
}
|
||||
|
||||
// pub fn compute_doc_mapping_after_delete(&self, mut delete_queue_cursor: DeleteQueueCursor) -> Vec<Option<DocId>> {
|
||||
// let delete_docs = self.compute_delete_mask(&mut delete_queue_cursor);
|
||||
// let max_doc: usize = self.max_doc as usize;
|
||||
@@ -211,41 +154,6 @@ impl<'a> SegmentWriter<'a> {
|
||||
.last()
|
||||
.expect("Last doc opstamp called on an empty segment writer"))
|
||||
}
|
||||
|
||||
/// TODO compute the bitset using the segment reader directly.
|
||||
pub fn compute_deleted_bitset(&self, delete_queue_cursor: &mut DeleteQueueCursor) -> Option<BitSet> {
|
||||
if let Some(first_opstamp) = self.doc_opstamps.first() {
|
||||
if !delete_queue_cursor.skip_to(*first_opstamp) {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
else {
|
||||
return None;
|
||||
}
|
||||
let last_opstamp = *self.doc_opstamps.last().unwrap();
|
||||
let mut deleted_docs = BitSet::with_capacity(self.max_doc as usize);
|
||||
while let Some(delete_operation) = delete_queue_cursor.peek() {
|
||||
if delete_operation.opstamp > last_opstamp {
|
||||
break;
|
||||
}
|
||||
// We can skip computing delete operations that
|
||||
// are older than our oldest document.
|
||||
//
|
||||
// They don't belong to this document anyway.
|
||||
let delete_term = delete_operation.term;
|
||||
let Field(field_id) = delete_term.field();
|
||||
let postings_writer: &Box<PostingsWriter> = &self.per_field_postings_writers[field_id as usize];
|
||||
let limit_doc_id = self.compute_doc_limit(delete_operation.opstamp);
|
||||
let mut document_deleter = DocumentDeleter {
|
||||
limit_doc_id: limit_doc_id,
|
||||
deleted_docs: &mut deleted_docs
|
||||
};
|
||||
postings_writer.push_documents(delete_term.value(), &mut document_deleter);
|
||||
delete_queue_cursor.consume();
|
||||
}
|
||||
Some(deleted_docs)
|
||||
}
|
||||
|
||||
|
||||
/// Indexes a new document
|
||||
///
|
||||
|
||||
34
src/lib.rs
34
src/lib.rs
@@ -300,19 +300,19 @@ mod tests {
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
|
||||
{
|
||||
{ // 0
|
||||
let doc = doc!(text_field=>"a b");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
{
|
||||
{ // 1
|
||||
let doc = doc!(text_field=>" a c");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
{
|
||||
{ // 2
|
||||
let doc = doc!(text_field=>" b c");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
{
|
||||
{ // 3
|
||||
let doc = doc!(text_field=>" b d");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
@@ -322,11 +322,11 @@ mod tests {
|
||||
{
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "a"));
|
||||
}
|
||||
{
|
||||
{ // 4
|
||||
let doc = doc!(text_field=>" b c");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
{
|
||||
{ // 5
|
||||
let doc = doc!(text_field=>" a");
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
@@ -337,14 +337,20 @@ mod tests {
|
||||
let searcher = index.searcher();
|
||||
let reader = searcher.segment_reader(0);
|
||||
assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none());
|
||||
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap();
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 2);
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 3);
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 5);
|
||||
assert!(!postings.advance());
|
||||
{
|
||||
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap();
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 5);
|
||||
assert!(!postings.advance());
|
||||
}
|
||||
{
|
||||
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap();
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 3);
|
||||
assert!(postings.advance());
|
||||
assert_eq!(postings.doc(), 4);
|
||||
assert!(!postings.advance());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,8 +7,7 @@ use postings::Recorder;
|
||||
use analyzer::SimpleTokenizer;
|
||||
use schema::Field;
|
||||
use analyzer::StreamingIterator;
|
||||
use indexer::document_receiver::DocumentReceiver;
|
||||
use datastruct::stacker::{HashMap, Entry, Heap};
|
||||
use datastruct::stacker::{HashMap, Heap};
|
||||
|
||||
/// The `PostingsWriter` is in charge of receiving documenting
|
||||
/// and building a `Segment` in anonymous memory.
|
||||
@@ -28,10 +27,6 @@ pub trait PostingsWriter {
|
||||
/// The actual serialization format is handled by the `PostingsSerializer`.
|
||||
fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>;
|
||||
|
||||
/// Push all documents associated with a given term to a
|
||||
/// given DocumentLister.
|
||||
fn push_documents(&self, term_val: &[u8], document_listener: &mut DocumentReceiver);
|
||||
|
||||
/// Closes all of the currently open `Recorder`'s.
|
||||
fn close(&mut self, heap: &Heap);
|
||||
|
||||
@@ -105,14 +100,6 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'
|
||||
}
|
||||
|
||||
|
||||
fn push_documents(&self, term_val: &[u8], document_receiver: &mut DocumentReceiver) {
|
||||
if let Entry::Occupied(addr) = self.term_index.lookup(term_val) {
|
||||
let heap = self.term_index.heap();
|
||||
let recorder: &Rec = heap.get_ref(addr);
|
||||
recorder.push_documents(addr, document_receiver, heap);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn suscribe(&mut self, doc: DocId, position: u32, term: &Term, heap: &Heap) {
|
||||
let mut recorder = self.term_index.get_or_create(term);
|
||||
|
||||
@@ -2,7 +2,6 @@ use DocId;
|
||||
use std::io;
|
||||
use postings::PostingsSerializer;
|
||||
use datastruct::stacker::{ExpUnrolledLinkedList, Heap, HeapAllocable};
|
||||
use indexer::document_receiver::DocumentReceiver;
|
||||
|
||||
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
|
||||
const POSITION_END: u32 = 4294967295;
|
||||
@@ -29,11 +28,6 @@ pub trait Recorder: HeapAllocable {
|
||||
fn close_doc(&mut self, heap: &Heap);
|
||||
/// Returns the number of document that have been seen so far
|
||||
fn doc_freq(&self) -> u32;
|
||||
/// Push all documents to a given DocumentLister.
|
||||
fn push_documents(&self,
|
||||
self_addr: u32,
|
||||
document_receiver: &mut DocumentReceiver,
|
||||
heap: &Heap);
|
||||
/// Pushes the postings information to the serializer.
|
||||
fn serialize(&self,
|
||||
self_addr: u32,
|
||||
@@ -79,15 +73,6 @@ impl Recorder for NothingRecorder {
|
||||
self.doc_freq
|
||||
}
|
||||
|
||||
fn push_documents(&self,
|
||||
self_addr: u32,
|
||||
document_receiver: &mut DocumentReceiver,
|
||||
heap: &Heap) {
|
||||
for doc in self.stack.iter(self_addr, heap) {
|
||||
document_receiver.receive(doc);
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize(&self,
|
||||
self_addr: u32,
|
||||
serializer: &mut PostingsSerializer,
|
||||
@@ -145,17 +130,6 @@ impl Recorder for TermFrequencyRecorder {
|
||||
self.doc_freq
|
||||
}
|
||||
|
||||
fn push_documents(&self,
|
||||
self_addr: u32,
|
||||
document_receiver: &mut DocumentReceiver,
|
||||
heap: &Heap) {
|
||||
let mut doc_iter = self.stack.iter(self_addr, heap);
|
||||
while let Some(doc) = doc_iter.next() {
|
||||
doc_iter.next().expect("Panicked while trying to read a frequency");
|
||||
document_receiver.receive(doc);
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize(&self,
|
||||
self_addr: u32,
|
||||
serializer: &mut PostingsSerializer,
|
||||
@@ -216,22 +190,6 @@ impl Recorder for TFAndPositionRecorder {
|
||||
self.doc_freq
|
||||
}
|
||||
|
||||
fn push_documents(&self,
|
||||
self_addr: u32,
|
||||
document_receiver: &mut DocumentReceiver,
|
||||
heap: &Heap) {
|
||||
let mut positions_iter = self.stack.iter(self_addr, heap);
|
||||
while let Some(doc) = positions_iter.next() {
|
||||
document_receiver.receive(doc);
|
||||
loop {
|
||||
let position = positions_iter.next().expect("This should never happen. Pleasee report the bug.");
|
||||
if position == POSITION_END {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize(&self,
|
||||
self_addr: u32,
|
||||
serializer: &mut PostingsSerializer,
|
||||
|
||||
Reference in New Issue
Block a user