mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
issue/43 TODO hunt
This commit is contained in:
@@ -147,7 +147,6 @@ impl SegmentReader {
|
||||
.open_read(SegmentComponent::POSITIONS)
|
||||
.unwrap_or_else(|_| ReadOnlySource::empty());
|
||||
|
||||
// TODO 0u64
|
||||
let delete_bitset =
|
||||
if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
|
||||
@@ -36,7 +36,7 @@ impl<T: BinarySerializable> LayerBuilder<T> {
|
||||
fn insert(&mut self, doc_id: DocId, value: &T) -> io::Result<Option<(DocId, u32)>> {
|
||||
self.remaining -= 1;
|
||||
self.len += 1;
|
||||
let offset = self.written_size() as u32; // TODO not sure if we want after or here
|
||||
let offset = self.written_size() as u32;
|
||||
try!(doc_id.serialize(&mut self.buffer));
|
||||
try!(value.serialize(&mut self.buffer));
|
||||
Ok(if self.remaining == 0 {
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
use std::sync::Arc;
|
||||
use DocId;
|
||||
|
||||
|
||||
// Doc to opstamp is used to identify which
|
||||
// document should be deleted.
|
||||
//
|
||||
// Since the docset matching the query of a delete operation
|
||||
// is not computed right when the delete operation is received,
|
||||
// we need to find a way to evaluate, for each document,
|
||||
// whether the document was added before or after
|
||||
// the delete operation. This anteriority is used by comparing
|
||||
// the docstamp of the document.
|
||||
//
|
||||
// The doc to opstamp mapping stores precisely an array
|
||||
// indexed by doc id and storing the opstamp of the document.
|
||||
//
|
||||
// This mapping is (for the moment) stricly increasing
|
||||
// because of the way document id are allocated.
|
||||
#[derive(Clone)]
|
||||
pub enum DocToOpstampMapping {
|
||||
WithMap(Arc<Vec<u64>>),
|
||||
@@ -13,12 +29,18 @@ impl From<Vec<u64>> for DocToOpstampMapping {
|
||||
}
|
||||
}
|
||||
|
||||
impl DocToOpstampMapping {
|
||||
// TODO Unit test
|
||||
pub fn compute_doc_limit(&self, opstamp: u64) -> DocId {
|
||||
impl DocToOpstampMapping {
|
||||
|
||||
/// Given an opstamp return the limit doc id L
|
||||
/// such that all doc id D such that
|
||||
// D >= L iff opstamp(D) >= than `target_opstamp`.
|
||||
//
|
||||
// The edge case opstamp = some doc opstamp is in practise
|
||||
// never called.
|
||||
pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
|
||||
match *self {
|
||||
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
|
||||
match doc_opstamps.binary_search(&opstamp) {
|
||||
match doc_opstamps.binary_search(&target_opstamp) {
|
||||
Ok(doc_id) => doc_id as DocId,
|
||||
Err(doc_id) => doc_id as DocId,
|
||||
}
|
||||
@@ -28,3 +50,44 @@ impl DocToOpstampMapping {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::DocToOpstampMapping;
|
||||
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_none() {
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::None;
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(1), u32::max_value());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_complex() {
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!());
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!(1u64));
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!(1u64, 12u64, 17u64, 23u64));
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
for i in 2u64..13u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1);
|
||||
}
|
||||
for i in 13u64..18u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2);
|
||||
}
|
||||
for i in 18u64..24u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3);
|
||||
}
|
||||
for i in 24u64..30u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,8 +209,6 @@ impl IndexWriter {
|
||||
let mut segment_updater = self.segment_updater.clone();
|
||||
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
|
||||
|
||||
// TODO fix this. the cursor might be too advanced
|
||||
// at this point.
|
||||
let generation = self.generation;
|
||||
|
||||
let join_handle: JoinHandle<Result<()>> =
|
||||
@@ -369,7 +367,6 @@ impl IndexWriter {
|
||||
// pending add segment commands will be dismissed.
|
||||
self.generation += 1;
|
||||
|
||||
// TODO requires a new delete queue...
|
||||
let rollback_future = self.segment_updater.rollback(self.generation);
|
||||
|
||||
// we cannot drop segment ready receiver yet
|
||||
@@ -457,13 +454,11 @@ impl IndexWriter {
|
||||
// committed segments.
|
||||
self.committed_opstamp = self.stamp();
|
||||
|
||||
// TODO remove clone
|
||||
let future = self.segment_updater.commit(self.committed_opstamp);
|
||||
|
||||
// wait for the segment update thread to have processed the info
|
||||
// TODO remove unwrap
|
||||
future.wait().unwrap();
|
||||
|
||||
self.segment_updater
|
||||
.commit(self.committed_opstamp)
|
||||
.wait()?;
|
||||
|
||||
self.delete_queue.clear();
|
||||
Ok(self.committed_opstamp)
|
||||
}
|
||||
|
||||
@@ -149,9 +149,7 @@ impl IndexMerger {
|
||||
}
|
||||
|
||||
assert!(min_val <= max_val);
|
||||
|
||||
// TODO test deleting all documents off the index.
|
||||
|
||||
|
||||
try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val));
|
||||
for (max_doc, u32_reader, delete_bitset) in u32_readers {
|
||||
for doc_id in 0..max_doc {
|
||||
|
||||
@@ -136,21 +136,21 @@ impl SegmentUpdater {
|
||||
}
|
||||
|
||||
|
||||
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=&'static str> {
|
||||
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=Error> {
|
||||
let me_clone = self.clone();
|
||||
self.0.pool.spawn_fn(move || {
|
||||
Ok(f(me_clone))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn rollback(&mut self, generation: usize) -> impl Future<Item=(), Error=&'static str> {
|
||||
pub fn rollback(&mut self, generation: usize) -> impl Future<Item=(), Error=Error> {
|
||||
self.0.generation.store(generation, Ordering::Release);
|
||||
self.run_async(|segment_updater| {
|
||||
segment_updater.0.segment_manager.rollback();
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=&'static str> {
|
||||
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=Error> {
|
||||
if generation >= self.0.generation.load(Ordering::Acquire) {
|
||||
future::Either::A(self.run_async(|segment_updater| {
|
||||
segment_updater.0.segment_manager.add_segment(segment_entry);
|
||||
@@ -174,7 +174,7 @@ impl SegmentUpdater {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
|
||||
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
|
||||
self.run_async(move |segment_updater| {
|
||||
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
|
||||
let segment_entries = segment_metas
|
||||
@@ -242,8 +242,7 @@ impl SegmentUpdater {
|
||||
.collect();
|
||||
|
||||
// An IndexMerger is like a "view" of our merged segments.
|
||||
// TODO unwrap
|
||||
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
|
||||
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
|
||||
let mut merged_segment = index.new_segment();
|
||||
|
||||
// ... we just serialize this index merger in our new segment
|
||||
@@ -284,7 +283,7 @@ impl SegmentUpdater {
|
||||
|
||||
fn end_merge(&self,
|
||||
merged_segment_metas: Vec<SegmentMeta>,
|
||||
resulting_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=&'static str> {
|
||||
resulting_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=Error> {
|
||||
|
||||
self.run_async(move |segment_updater| {
|
||||
segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry);
|
||||
|
||||
@@ -102,8 +102,6 @@ mod tests {
|
||||
}
|
||||
{
|
||||
let boolean_query = BooleanQuery::from(vec![(Occur::MustNot, make_term_query("d")),]);
|
||||
// TODO optimize this use case : only MustNot subqueries... no need
|
||||
// to read any postings.
|
||||
assert_eq!(matching_docs(&boolean_query), Vec::new());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user