diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 425d290ce..8543f7104 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -31,6 +31,7 @@ impl InnerDeleteQueue { } } + #[derive(Default, Clone)] pub struct ReadOnlyDeletes(Vec>>); diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs new file mode 100644 index 000000000..843002416 --- /dev/null +++ b/src/indexer/doc_opstamp_mapping.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; +use DocId; + +#[derive(Clone)] +pub enum DocToOpstampMapping { + WithMap(Arc>), + None +} + +impl From> for DocToOpstampMapping { + fn from(opstamps: Vec) -> DocToOpstampMapping { + DocToOpstampMapping::WithMap(Arc::new(opstamps)) + } +} + +impl DocToOpstampMapping { + // TODO Unit test + pub fn compute_doc_limit(&self, opstamp: u64) -> DocId { + match *self { + DocToOpstampMapping::WithMap(ref doc_opstamps) => { + match doc_opstamps.binary_search(&opstamp) { + Ok(doc_id) => doc_id as DocId, + Err(doc_id) => doc_id as DocId, + } + } + DocToOpstampMapping::None => DocId::max_value(), + } + } +} + diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 03b936229..b05d4f324 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,38 +1,36 @@ -use schema::Schema; -use schema::Document; -use super::operation::AddOperation; +use bit_set::BitSet; +use chan; use core::Index; use core::Segment; -use core::SegmentMeta; -use std::sync::Arc; -use core::SegmentId; -use indexer::operation::DeleteOperation; -use schema::Term; -use indexer::SegmentEntry; -use std::thread::JoinHandle; -use indexer::MergePolicy; -use indexer::SegmentWriter; -use DocId; -use bit_set::BitSet; -use fastfield::delete::write_delete_bitset; -use postings::SegmentPostingsOption; -use postings::DocSet; use core::SegmentComponent; -use super::directory_lock::DirectoryLock; -use futures::Future; -use std::clone::Clone; -use indexer::delete_queue::DeleteQueue; -use std::io; -use std::thread; -use futures::Canceled; -use std::mem; -use datastruct::stacker::Heap; +use core::SegmentId; +use core::SegmentMeta; use core::SegmentReader; -use std::mem::swap; -use chan; -use super::segment_updater::SegmentUpdater; -use Result; +use datastruct::stacker::Heap; use Error; +use fastfield::delete::write_delete_bitset; +use futures::Canceled; +use futures::Future; +use indexer::delete_queue::DeleteQueue; +use indexer::doc_opstamp_mapping::DocToOpstampMapping; +use indexer::MergePolicy; +use indexer::operation::DeleteOperation; +use indexer::SegmentEntry; +use indexer::SegmentWriter; +use postings::DocSet; +use postings::SegmentPostingsOption; +use Result; +use schema::Document; +use schema::Schema; +use schema::Term; +use std::io; +use std::mem; +use std::mem::swap; +use std::thread; +use std::thread::JoinHandle; +use super::directory_lock::DirectoryLock; +use super::operation::AddOperation; +use super::segment_updater::SegmentUpdater; // Size of the margin for the heap. A segment is closed when the remaining memory // in the heap goes below MARGIN_IN_BYTES. @@ -44,13 +42,9 @@ pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32; // Add document will block if the number of docs waiting in the queue to be indexed reaches PIPELINE_MAX_SIZE_IN_DOCS const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; - - type DocumentSender = chan::Sender; type DocumentReceiver = chan::Receiver; - - /// `IndexWriter` is the user entry-point to add document to an index. /// /// It manages a small number of indexing thread, as well as a shared @@ -90,35 +84,6 @@ pub struct IndexWriter { impl !Send for IndexWriter {} impl !Sync for IndexWriter {} - -// TODO move doc to opstamp mapping to its own file -#[derive(Clone)] -pub enum DocToOpstampMapping { - WithMap(Arc>), - None -} - -impl From> for DocToOpstampMapping { - fn from(opstamps: Vec) -> DocToOpstampMapping { - DocToOpstampMapping::WithMap(Arc::new(opstamps)) - } -} - -impl DocToOpstampMapping { - fn compute_doc_limit(&self, opstamp: u64) -> DocId { - match *self { - DocToOpstampMapping::WithMap(ref doc_opstamps) => { - match doc_opstamps.binary_search(&opstamp) { - Ok(doc_id) => doc_id as DocId, - Err(doc_id) => doc_id as DocId, - } - } - DocToOpstampMapping::None => DocId::max_value(), - } - } -} - - /// TODO /// work on SegmentMeta pub fn advance_deletes( diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index aea0965f5..478e851da 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -10,9 +10,11 @@ mod segment_manager; pub mod delete_queue; pub mod segment_updater; mod directory_lock; +mod segment_entry; +mod doc_opstamp_mapping; pub mod operation; -pub use self::segment_register::SegmentEntry; +pub use self::segment_entry::SegmentEntry; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; pub use self::index_writer::IndexWriter; diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs new file mode 100644 index 000000000..c8a917665 --- /dev/null +++ b/src/indexer/segment_entry.rs @@ -0,0 +1,71 @@ +use indexer::doc_opstamp_mapping::DocToOpstampMapping; +use core::SegmentMeta; +use core::SegmentId; +use std::fmt; + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum SegmentState { + Ready, + InMerge, +} + +impl SegmentState { + pub fn letter_code(&self,) -> char { + match *self { + SegmentState::InMerge => 'M', + SegmentState::Ready => 'R', + } + } +} + +#[derive(Clone)] +pub struct SegmentEntry { + meta: SegmentMeta, + state: SegmentState, + doc_to_opstamp: DocToOpstampMapping, +} + +impl SegmentEntry { + + pub fn new(segment_meta: SegmentMeta) -> SegmentEntry { + SegmentEntry { + meta: segment_meta, + state: SegmentState::Ready, + doc_to_opstamp: DocToOpstampMapping::None, + } + } + + pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping { + &self.doc_to_opstamp + } + + pub fn state(&self) -> SegmentState { + self.state + } + + pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) { + self.doc_to_opstamp = doc_to_opstamp; + } + + pub fn segment_id(&self) -> SegmentId { + self.meta.id() + } + + pub fn meta(&self) -> &SegmentMeta { + &self.meta + } + + pub fn start_merge(&mut self,) { + self.state = SegmentState::InMerge; + } + + pub fn is_ready(&self,) -> bool { + self.state == SegmentState::Ready + } +} + +impl fmt::Debug for SegmentEntry { + fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state) + } +} diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index fedb53ac1..87b93be4c 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -1,73 +1,9 @@ use core::SegmentId; use std::collections::HashMap; use core::SegmentMeta; -use indexer::index_writer::DocToOpstampMapping; use std::fmt; use std::fmt::{Debug, Formatter}; - - -#[derive(Clone, PartialEq, Eq, Debug)] -pub enum SegmentState { - Ready, - InMerge, -} - -impl SegmentState { - fn letter_code(&self,) -> char { - match *self { - SegmentState::InMerge => 'M', - SegmentState::Ready => 'R', - } - } -} - -#[derive(Clone)] -pub struct SegmentEntry { - meta: SegmentMeta, - state: SegmentState, - doc_to_opstamp: DocToOpstampMapping, -} - -impl SegmentEntry { - - pub fn new(segment_meta: SegmentMeta) -> SegmentEntry { - SegmentEntry { - meta: segment_meta, - state: SegmentState::Ready, - doc_to_opstamp: DocToOpstampMapping::None, - } - } - - pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping { - &self.doc_to_opstamp - } - - pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) { - self.doc_to_opstamp = doc_to_opstamp; - } - - pub fn segment_id(&self) -> SegmentId { - self.meta.id() - } - - pub fn meta(&self) -> &SegmentMeta { - &self.meta - } - - fn start_merge(&mut self,) { - self.state = SegmentState::InMerge; - } - - fn is_ready(&self,) -> bool { - self.state == SegmentState::Ready - } -} - -impl Debug for SegmentEntry { - fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state) - } -} +use indexer::segment_entry::SegmentEntry; @@ -88,7 +24,7 @@ impl Debug for SegmentRegister { fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { try!(write!(f, "SegmentRegister(")); for (k, v) in &self.segment_states { - try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state.letter_code())); + try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state().letter_code())); } try!(write!(f, ")")); Ok(()) @@ -105,7 +41,7 @@ impl SegmentRegister { self.segment_states .values() .filter(|segment_entry| segment_entry.is_ready()) - .map(|segment_entry| segment_entry.meta.clone()) + .map(|segment_entry| segment_entry.meta().clone()) .collect() } @@ -119,7 +55,7 @@ impl SegmentRegister { pub fn segment_metas(&self,) -> Vec { let mut segment_ids: Vec = self.segment_states .values() - .map(|segment_entry| segment_entry.meta.clone()) + .map(|segment_entry| segment_entry.meta().clone()) .collect(); segment_ids.sort_by_key(|meta| meta.id()); segment_ids @@ -145,7 +81,7 @@ impl SegmentRegister { } pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) { - let segment_id = segment_entry.meta.id(); + let segment_id = segment_entry.segment_id(); self.segment_states.insert(segment_id, segment_entry); } @@ -200,18 +136,18 @@ mod tests { let segment_entry = SegmentEntry::new(segment_meta); segment_register.add_segment_entry(segment_entry); } - assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready); + assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready); assert_eq!(segment_register.segment_ids(), vec!(segment_id_a)); { let segment_meta = SegmentMeta::new(segment_id_b); let segment_entry = SegmentEntry::new(segment_meta); segment_register.add_segment_entry(segment_entry); } - assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::Ready); + assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::Ready); segment_register.start_merge(&segment_id_a); segment_register.start_merge(&segment_id_b); - assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::InMerge); - assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::InMerge); + assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::InMerge); + assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::InMerge); segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); {