issue/43 docstamp -> opstamp

This commit is contained in:
Paul Masurel
2017-02-04 16:21:24 +09:00
parent 09782858da
commit 0820992141
12 changed files with 216 additions and 55 deletions

View File

@@ -39,7 +39,7 @@ pub struct Index {
directory: Box<Directory>,
schema: Schema,
searcher_pool: Arc<Pool<Searcher>>,
docstamp: u64,
opstamp: u64,
}
@@ -116,13 +116,13 @@ impl Index {
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: Box<Directory>, metas: IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let docstamp = metas.opstamp;
let opstamp = metas.opstamp;
// TODO log somethings is uncommitted is not empty.
let index = Index {
directory: directory,
schema: schema,
searcher_pool: Arc::new(Pool::new()),
docstamp: docstamp,
opstamp: opstamp,
};
try!(index.load_searchers());
Ok(index)
@@ -141,12 +141,12 @@ impl Index {
Index::create_from_metas(directory.box_clone(), metas)
}
/// Returns the index docstamp.
/// Returns the index opstamp.
///
/// The docstamp is the number of documents that have been added
/// The opstamp is the number of documents that have been added
/// from the beginning of time, and until the moment of the last commit.
pub fn docstamp(&self) -> u64 {
self.docstamp
pub fn opstamp(&self) -> u64 {
self.opstamp
}
/// Creates a multithreaded writer.
@@ -291,7 +291,7 @@ impl Clone for Index {
directory: self.directory.box_clone(),
schema: self.schema.clone(),
searcher_pool: self.searcher_pool.clone(),
docstamp: self.docstamp,
opstamp: self.opstamp,
}
}
}

View File

@@ -25,7 +25,6 @@ impl fmt::Debug for Segment {
}
}
/// Creates a new segment given an `Index` and a `SegmentId`
///
/// The function is here to make it private outside `tantivy`.
@@ -38,18 +37,20 @@ pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64)
}
impl Segment {
/// Returns our index's schema.
pub fn schema(&self,) -> Schema {
self.index.schema()
}
pub fn commit_opstamp(&self) -> u64 {
self.commit_opstamp
}
/// Returns the segment's id.
pub fn id(&self,) -> SegmentId {
self.segment_id
}
/// Returns the relative path of a component of our segment.
///

View File

@@ -23,7 +23,9 @@ impl SegmentComponent {
SegmentComponent::STORE => ".store".to_string(),
SegmentComponent::FASTFIELDS => ".fast".to_string(),
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
SegmentComponent::DELETE(opstamp) => format!("{}.del", opstamp)
SegmentComponent::DELETE(opstamp) => {
format!(".{}.del", opstamp)
}
}
}
}

View File

@@ -3,9 +3,13 @@ use core::Segment;
use core::SegmentId;
use core::SegmentComponent;
use schema::Term;
use bit_set::BitSet;
use common::HasLen;
use fastfield::delete::DeleteBitSet;
use store::StoreReader;
use schema::Document;
use directory::ReadOnlySource;
use directory::error::FileError;
use DocId;
use std::io;
use std::str;
@@ -44,6 +48,7 @@ pub struct SegmentReader {
store_reader: StoreReader,
fast_fields_reader: U32FastFieldsReader,
fieldnorms_reader: U32FastFieldsReader,
delete_bitset: DeleteBitSet,
positions_data: ReadOnlySource,
schema: Schema,
}
@@ -63,9 +68,13 @@ impl SegmentReader {
/// Today, `tantivy` does not handle deletes so max doc and
/// num_docs are the same.
pub fn num_docs(&self) -> DocId {
self.segment_info.max_doc
self.segment_info.max_doc - self.num_deleted_docs()
}
pub fn num_deleted_docs(&self) -> DocId {
self.delete_bitset.len() as DocId
}
/// Accessor to a segment's fast field reader given a field.
pub fn get_fast_field_reader(&self, field: Field) -> io::Result<U32FastFieldReader> {
let field_entry = self.schema.get_field_entry(field);
@@ -137,6 +146,15 @@ impl SegmentReader {
.open_read(SegmentComponent::POSITIONS)
.unwrap_or_else(|_| ReadOnlySource::empty());
// TODO 0u64
let delete_data_res = segment.open_read(SegmentComponent::DELETE(segment.commit_opstamp()));
let delete_bitset;
if let Err(FileError::FileDoesNotExist(_)) = delete_data_res {
delete_bitset = DeleteBitSet::empty();
}
else {
delete_bitset = DeleteBitSet::open(delete_data_res?);
}
let schema = segment.schema();
Ok(SegmentReader {
segment_info: segment_info,
@@ -146,6 +164,7 @@ impl SegmentReader {
store_reader: store_reader,
fast_fields_reader: fast_fields_reader,
fieldnorms_reader: fieldnorms_reader,
delete_bitset: delete_bitset,
positions_data: positions_data,
schema: schema,
})
@@ -214,9 +233,10 @@ impl SegmentReader {
FreqHandler::new_without_freq()
}
};
Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, freq_handler))
Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, &self.delete_bitset, freq_handler))
}
/// Returns the posting list associated with a term.
pub fn read_postings_all_info(&self, term: &Term) -> Option<SegmentPostings> {
let field_entry = self.schema.get_field_entry(term.field());

View File

@@ -4,6 +4,7 @@ use std::io::Write;
use std::io;
use directory::ReadOnlySource;
use DocId;
use common::HasLen;
pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io::Result<()> {
let max_doc = delete_bitset.capacity();
@@ -28,23 +29,54 @@ pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io:
writer.flush()
}
pub struct DeleteBitSet(ReadOnlySource);
#[derive(Clone)]
pub struct DeleteBitSet {
data: ReadOnlySource,
len: usize,
}
impl DeleteBitSet {
pub fn open(data: ReadOnlySource) -> DeleteBitSet {
DeleteBitSet(data)
let num_deleted: usize = data
.as_slice()
.iter()
.map(|b| b.count_ones() as usize)
.sum();
DeleteBitSet {
data: data,
len: num_deleted,
}
}
pub fn empty() -> DeleteBitSet {
DeleteBitSet {
data: ReadOnlySource::empty(),
len: 0,
}
}
pub fn is_deleted(&self, doc: DocId) -> bool {
let byte_offset = doc / 8u32;
let b: u8 = (*self.0)[byte_offset as usize];
let shift = (doc & 7u32) as u8;
b & (1u8 << shift) != 0
if self.len == 0 {
false
}
else {
let byte_offset = doc / 8u32;
let b: u8 = (*self.data)[byte_offset as usize];
let shift = (doc & 7u32) as u8;
b & (1u8 << shift) != 0
}
}
}
impl HasLen for DeleteBitSet {
fn len(&self) -> usize {
self.len
}
}
#[cfg(test)]
mod tests {
@@ -67,6 +99,7 @@ mod tests {
for doc in 0..n {
assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId));
}
assert_eq!(delete_bitset.len(), bitset.len());
}
}

View File

@@ -75,8 +75,8 @@ pub struct IndexWriter {
delete_queue: DeleteQueue,
uncommitted_docstamp: u64,
committed_docstamp: u64,
uncommitted_opstamp: u64,
committed_opstamp: u64,
}
// IndexWriter cannot be sent to another thread.
@@ -211,7 +211,6 @@ impl IndexWriter {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
// was dropped.
opstamp = 0u64;
return Ok(())
}
@@ -282,8 +281,8 @@ impl IndexWriter {
delete_queue: delete_queue,
committed_docstamp: index.docstamp(),
uncommitted_docstamp: index.docstamp(),
committed_opstamp: index.opstamp(),
uncommitted_opstamp: index.opstamp(),
generation: 0,
@@ -338,7 +337,7 @@ impl IndexWriter {
/// After calling rollback, the index is in the same
/// state as it was after the last commit.
///
/// The docstamp at the last commit is returned.
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<u64> {
// by updating the generation in the segment updater,
@@ -380,9 +379,9 @@ impl IndexWriter {
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
// reset the docstamp
self.uncommitted_docstamp = self.committed_docstamp;
Ok(self.committed_docstamp)
// reset the opstamp
self.uncommitted_opstamp = self.committed_opstamp;
Ok(self.committed_opstamp)
}
@@ -397,7 +396,7 @@ impl IndexWriter {
/// long as the hard disk is spared), it will be possible
/// to resume indexing from this point.
///
/// Commit returns the `docstamp` of the last document
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<u64> {
@@ -406,9 +405,6 @@ impl IndexWriter {
// and recreate a new one channels.
self.recreate_document_channel();
// Docstamp of the last document in this commit.
self.committed_docstamp = self.uncommitted_docstamp;
let mut former_workers_join_handle = Vec::new();
swap(&mut former_workers_join_handle,
&mut self.workers_join_handle);
@@ -430,13 +426,15 @@ impl IndexWriter {
// This will move uncommitted segments to the state of
// committed segments.
let future = self.segment_updater.commit(self.committed_docstamp);
self.committed_opstamp = self.stamp();
let future = self.segment_updater.commit(self.committed_opstamp);
// wait for the segment update thread to have processed the info
// TODO remove unwrap
future.wait().unwrap();
Ok(self.committed_docstamp)
Ok(self.committed_opstamp)
}
@@ -446,8 +444,8 @@ impl IndexWriter {
}
fn stamp(&mut self) -> u64 {
let opstamp = self.uncommitted_docstamp;
self.uncommitted_docstamp += 1u64;
let opstamp = self.uncommitted_opstamp;
self.uncommitted_opstamp += 1u64;
opstamp
}
@@ -455,7 +453,7 @@ impl IndexWriter {
///
/// If the indexing pipeline is full, this call may block.
///
/// The docstamp is an increasing `u64` that can
/// The opstamp is an increasing `u64` that can
/// be used by the client to align commits with its own
/// document queue.
///

View File

@@ -53,7 +53,7 @@ impl IndexMerger {
let mut max_doc = 0;
for segment in segments {
let reader = try!(SegmentReader::open(segment.clone()));
max_doc += reader.max_doc();
max_doc += reader.num_docs();
readers.push(reader);
}
Ok(IndexMerger {

View File

@@ -57,11 +57,11 @@ fn create_metas(segment_manager: &SegmentManager, schema: Schema, opstamp: u64)
///
/// This method is not part of tantivy's public API
pub fn save_new_metas(schema: Schema,
docstamp: u64,
opstamp: u64,
directory: &mut Directory)
-> Result<()> {
let segment_manager = SegmentManager::default();
save_metas(&segment_manager, schema, docstamp, directory)
save_metas(&segment_manager, schema, opstamp, directory)
}
@@ -77,10 +77,10 @@ pub fn save_new_metas(schema: Schema,
/// This method is not part of tantivy's public API
pub fn save_metas(segment_manager: &SegmentManager,
schema: Schema,
docstamp: u64,
opstamp: u64,
directory: &mut Directory)
-> Result<()> {
let metas = create_metas(segment_manager, schema, docstamp);
let metas = create_metas(segment_manager, schema, opstamp);
let mut w = Vec::new();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
Ok(directory
@@ -178,7 +178,7 @@ impl SegmentUpdater {
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.index.schema(),
segment_updater.0.index.docstamp(),
opstamp,
directory.borrow_mut()).expect("Could not save metas.");
segment_updater.consider_merge_options();
})
@@ -284,7 +284,7 @@ impl SegmentUpdater {
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.index.schema(),
segment_updater.0.index.docstamp(),
segment_updater.0.index.opstamp(),
directory.borrow_mut()).expect("Could not save metas.");
for segment_meta in merged_segment_metas {
segment_updater.0.index.delete_segment(segment_meta.segment_id);

View File

@@ -22,8 +22,40 @@ use indexer::index_writer::MARGIN_IN_BYTES;
use super::operation::AddOperation;
use bit_set::BitSet;
use indexer::document_receiver::DocumentReceiver;
use core::SegmentReader;
use postings::SegmentPostingsOption;
use postings::DocSet;
fn update_deleted_bitset(
segment_reader: &SegmentReader,
bitset: &mut BitSet,
delete_cursor: &mut DeleteQueueCursor,
limit_opstamp_opt: Option<u64>) -> bool {
let mut has_changed = false;
let limit_opstamp = limit_opstamp_opt.unwrap_or(u64::max_value());
loop {
if let Some(delete_op) = delete_cursor.peek() {
if delete_op.opstamp > limit_opstamp {
break;
}
if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
while docset.advance() {
has_changed = true;
let deleted_doc = docset.doc();
bitset.insert(deleted_doc as usize);
}
}
}
else {
break;
}
delete_cursor.consume();
}
has_changed
}
struct DocumentDeleter<'a> {
limit_doc_id: DocId,
deleted_docs: &'a mut BitSet,
@@ -180,6 +212,7 @@ impl<'a> SegmentWriter<'a> {
.expect("Last doc opstamp called on an empty segment writer"))
}
/// TODO compute the bitset using the segment reader directly.
pub fn compute_deleted_bitset(&self, delete_queue_cursor: &mut DeleteQueueCursor) -> Option<BitSet> {
if let Some(first_opstamp) = self.doc_opstamps.first() {
if !delete_queue_cursor.skip_to(*first_opstamp) {

View File

@@ -290,6 +290,65 @@ mod tests {
}
}
#[test]
fn test_delete_postings() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let doc = doc!(text_field=>"a b");
index_writer.add_document(doc).unwrap();
}
{
let doc = doc!(text_field=>" a c");
index_writer.add_document(doc).unwrap();
}
{
let doc = doc!(text_field=>" b c");
index_writer.add_document(doc).unwrap();
}
{
let doc = doc!(text_field=>" b d");
index_writer.add_document(doc).unwrap();
}
{
index_writer.delete_term(Term::from_field_text(text_field, "c"));
}
{
index_writer.delete_term(Term::from_field_text(text_field, "a"));
}
{
let doc = doc!(text_field=>" b c");
index_writer.add_document(doc).unwrap();
}
{
let doc = doc!(text_field=>" a");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none());
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 2);
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
assert_eq!(postings.doc(), 5);
assert!(!postings.advance());
}
}
#[test]
fn test_termfreq() {
let mut schema_builder = SchemaBuilder::default();

View File

@@ -55,6 +55,9 @@ pub trait DocSet {
/// Returns the current document
fn doc(&self) -> DocId;
/// TODO can impl trait for trait?
/// Advances the cursor to the next document
/// None is returned if the iterator has `DocSet`
/// has already been entirely consumed.
@@ -67,6 +70,7 @@ pub trait DocSet {
}
}
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
fn advance(&mut self) -> bool {
let unboxed: &mut TDocSet = self.borrow_mut();

View File

@@ -2,6 +2,7 @@ use compression::{NUM_DOCS_PER_BLOCK, BlockDecoder, VIntDecoder};
use DocId;
use postings::{Postings, FreqHandler, DocSet, HasLen};
use std::num::Wrapping;
use fastfield::delete::DeleteBitSet;
const EMPTY_DATA: [u8; 0] = [0u8; 0];
@@ -18,6 +19,7 @@ pub struct SegmentPostings<'a> {
freq_handler: FreqHandler,
remaining_data: &'a [u8],
cur: Wrapping<usize>,
delete_bitset: DeleteBitSet,
}
impl<'a> SegmentPostings<'a> {
@@ -41,7 +43,10 @@ impl<'a> SegmentPostings<'a> {
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding
/// frequencies and/or positions
pub fn from_data(len: u32, data: &'a [u8], freq_handler: FreqHandler) -> SegmentPostings<'a> {
pub fn from_data(len: u32,
data: &'a [u8],
delete_bitset: &'a DeleteBitSet,
freq_handler: FreqHandler) -> SegmentPostings<'a> {
SegmentPostings {
len: len as usize,
doc_offset: 0,
@@ -49,6 +54,7 @@ impl<'a> SegmentPostings<'a> {
freq_handler: freq_handler,
remaining_data: data,
cur: Wrapping(usize::max_value()),
delete_bitset: delete_bitset.clone(),
}
}
@@ -60,6 +66,7 @@ impl<'a> SegmentPostings<'a> {
block_decoder: BlockDecoder::new(),
freq_handler: FreqHandler::new_without_freq(),
remaining_data: &EMPTY_DATA,
delete_bitset: DeleteBitSet::empty(),
cur: Wrapping(usize::max_value()),
}
}
@@ -77,14 +84,18 @@ impl<'a> DocSet for SegmentPostings<'a> {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> bool {
self.cur += Wrapping(1);
if self.cur.0 >= self.len {
return false;
loop {
self.cur += Wrapping(1);
if self.cur.0 >= self.len {
return false;
}
if self.index_within_block() == 0 {
self.load_next_block();
}
if !self.delete_bitset.is_deleted(self.doc()) {
return true;
}
}
if self.index_within_block() == 0 {
self.load_next_block();
}
true
}
#[inline]