issue/43 Isolated segment_entry / doc_opstamp_mapping

This commit is contained in:
Paul Masurel
2017-02-19 15:01:25 +09:00
parent 1c03d98a11
commit e3d2fca844
6 changed files with 142 additions and 137 deletions

View File

@@ -31,6 +31,7 @@ impl InnerDeleteQueue {
}
}
#[derive(Default, Clone)]
pub struct ReadOnlyDeletes(Vec<Arc<Vec<DeleteOperation>>>);

View File

@@ -0,0 +1,30 @@
use std::sync::Arc;
use DocId;
#[derive(Clone)]
pub enum DocToOpstampMapping {
WithMap(Arc<Vec<u64>>),
None
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
impl DocToOpstampMapping {
// TODO Unit test
pub fn compute_doc_limit(&self, opstamp: u64) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&opstamp) {
Ok(doc_id) => doc_id as DocId,
Err(doc_id) => doc_id as DocId,
}
}
DocToOpstampMapping::None => DocId::max_value(),
}
}
}

View File

@@ -1,38 +1,36 @@
use schema::Schema;
use schema::Document;
use super::operation::AddOperation;
use bit_set::BitSet;
use chan;
use core::Index;
use core::Segment;
use core::SegmentMeta;
use std::sync::Arc;
use core::SegmentId;
use indexer::operation::DeleteOperation;
use schema::Term;
use indexer::SegmentEntry;
use std::thread::JoinHandle;
use indexer::MergePolicy;
use indexer::SegmentWriter;
use DocId;
use bit_set::BitSet;
use fastfield::delete::write_delete_bitset;
use postings::SegmentPostingsOption;
use postings::DocSet;
use core::SegmentComponent;
use super::directory_lock::DirectoryLock;
use futures::Future;
use std::clone::Clone;
use indexer::delete_queue::DeleteQueue;
use std::io;
use std::thread;
use futures::Canceled;
use std::mem;
use datastruct::stacker::Heap;
use core::SegmentId;
use core::SegmentMeta;
use core::SegmentReader;
use std::mem::swap;
use chan;
use super::segment_updater::SegmentUpdater;
use Result;
use datastruct::stacker::Heap;
use Error;
use fastfield::delete::write_delete_bitset;
use futures::Canceled;
use futures::Future;
use indexer::delete_queue::DeleteQueue;
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use indexer::MergePolicy;
use indexer::operation::DeleteOperation;
use indexer::SegmentEntry;
use indexer::SegmentWriter;
use postings::DocSet;
use postings::SegmentPostingsOption;
use Result;
use schema::Document;
use schema::Schema;
use schema::Term;
use std::io;
use std::mem;
use std::mem::swap;
use std::thread;
use std::thread::JoinHandle;
use super::directory_lock::DirectoryLock;
use super::operation::AddOperation;
use super::segment_updater::SegmentUpdater;
// Size of the margin for the heap. A segment is closed when the remaining memory
// in the heap goes below MARGIN_IN_BYTES.
@@ -44,13 +42,9 @@ pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32;
// Add document will block if the number of docs waiting in the queue to be indexed reaches PIPELINE_MAX_SIZE_IN_DOCS
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
type DocumentSender = chan::Sender<AddOperation>;
type DocumentReceiver = chan::Receiver<AddOperation>;
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -90,35 +84,6 @@ pub struct IndexWriter {
impl !Send for IndexWriter {}
impl !Sync for IndexWriter {}
// TODO move doc to opstamp mapping to its own file
#[derive(Clone)]
pub enum DocToOpstampMapping {
WithMap(Arc<Vec<u64>>),
None
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
impl DocToOpstampMapping {
fn compute_doc_limit(&self, opstamp: u64) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&opstamp) {
Ok(doc_id) => doc_id as DocId,
Err(doc_id) => doc_id as DocId,
}
}
DocToOpstampMapping::None => DocId::max_value(),
}
}
}
/// TODO
/// work on SegmentMeta
pub fn advance_deletes(

View File

@@ -10,9 +10,11 @@ mod segment_manager;
pub mod delete_queue;
pub mod segment_updater;
mod directory_lock;
mod segment_entry;
mod doc_opstamp_mapping;
pub mod operation;
pub use self::segment_register::SegmentEntry;
pub use self::segment_entry::SegmentEntry;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
pub use self::index_writer::IndexWriter;

View File

@@ -0,0 +1,71 @@
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use core::SegmentMeta;
use core::SegmentId;
use std::fmt;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum SegmentState {
Ready,
InMerge,
}
impl SegmentState {
pub fn letter_code(&self,) -> char {
match *self {
SegmentState::InMerge => 'M',
SegmentState::Ready => 'R',
}
}
}
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
state: SegmentState,
doc_to_opstamp: DocToOpstampMapping,
}
impl SegmentEntry {
pub fn new(segment_meta: SegmentMeta) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
doc_to_opstamp: DocToOpstampMapping::None,
}
}
pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping {
&self.doc_to_opstamp
}
pub fn state(&self) -> SegmentState {
self.state
}
pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) {
self.doc_to_opstamp = doc_to_opstamp;
}
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
}
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
pub fn start_merge(&mut self,) {
self.state = SegmentState::InMerge;
}
pub fn is_ready(&self,) -> bool {
self.state == SegmentState::Ready
}
}
impl fmt::Debug for SegmentEntry {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state)
}
}

View File

@@ -1,73 +1,9 @@
use core::SegmentId;
use std::collections::HashMap;
use core::SegmentMeta;
use indexer::index_writer::DocToOpstampMapping;
use std::fmt;
use std::fmt::{Debug, Formatter};
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum SegmentState {
Ready,
InMerge,
}
impl SegmentState {
fn letter_code(&self,) -> char {
match *self {
SegmentState::InMerge => 'M',
SegmentState::Ready => 'R',
}
}
}
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
state: SegmentState,
doc_to_opstamp: DocToOpstampMapping,
}
impl SegmentEntry {
pub fn new(segment_meta: SegmentMeta) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
doc_to_opstamp: DocToOpstampMapping::None,
}
}
pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping {
&self.doc_to_opstamp
}
pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) {
self.doc_to_opstamp = doc_to_opstamp;
}
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
}
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
fn start_merge(&mut self,) {
self.state = SegmentState::InMerge;
}
fn is_ready(&self,) -> bool {
self.state == SegmentState::Ready
}
}
impl Debug for SegmentEntry {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state)
}
}
use indexer::segment_entry::SegmentEntry;
@@ -88,7 +24,7 @@ impl Debug for SegmentRegister {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
try!(write!(f, "SegmentRegister("));
for (k, v) in &self.segment_states {
try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state.letter_code()));
try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state().letter_code()));
}
try!(write!(f, ")"));
Ok(())
@@ -105,7 +41,7 @@ impl SegmentRegister {
self.segment_states
.values()
.filter(|segment_entry| segment_entry.is_ready())
.map(|segment_entry| segment_entry.meta.clone())
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
@@ -119,7 +55,7 @@ impl SegmentRegister {
pub fn segment_metas(&self,) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
.values()
.map(|segment_entry| segment_entry.meta.clone())
.map(|segment_entry| segment_entry.meta().clone())
.collect();
segment_ids.sort_by_key(|meta| meta.id());
segment_ids
@@ -145,7 +81,7 @@ impl SegmentRegister {
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
let segment_id = segment_entry.meta.id();
let segment_id = segment_entry.segment_id();
self.segment_states.insert(segment_id, segment_entry);
}
@@ -200,18 +136,18 @@ mod tests {
let segment_entry = SegmentEntry::new(segment_meta);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready);
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready);
assert_eq!(segment_register.segment_ids(), vec!(segment_id_a));
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_entry = SegmentEntry::new(segment_meta);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::Ready);
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::Ready);
segment_register.start_merge(&segment_id_a);
segment_register.start_merge(&segment_id_b);
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::InMerge);
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::InMerge);
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::InMerge);
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::InMerge);
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{