mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-29 14:40:40 +00:00
issue/43 SegmentMeta refactoring
This commit is contained in:
@@ -186,7 +186,7 @@ impl Index {
|
||||
let metas = load_metas(self.directory())?;
|
||||
Ok(metas
|
||||
.committed_segments
|
||||
.iter()
|
||||
.into_iter()
|
||||
.map(|segment_meta| self.segment(segment_meta))
|
||||
.collect())
|
||||
}
|
||||
@@ -200,20 +200,14 @@ impl Index {
|
||||
delete_segment(self.directory(), segment_id);
|
||||
}
|
||||
|
||||
/// Return a segment object given a `segment_id`
|
||||
///
|
||||
/// The segment may or may not exist.
|
||||
// pub fn segment(&self, segment_id: SegmentId, opstamp: u64) -> Segment {
|
||||
// (self.clone(), segment_id, opstamp)
|
||||
// }
|
||||
|
||||
pub fn segment(&self, segment_meta: &SegmentMeta) -> Segment {
|
||||
create_segment(self.clone(), segment_meta.segment_id, segment_meta.opstamp)
|
||||
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
create_segment(self.clone(), segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
pub fn new_segment(&self, opstamp: u64) -> Segment {
|
||||
create_segment(self.clone(), SegmentId::generate_random(), opstamp)
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
let segment_meta = SegmentMeta::new(SegmentId::generate_random());
|
||||
create_segment(self.clone(), segment_meta)
|
||||
}
|
||||
|
||||
/// Return a reference to the index directory.
|
||||
|
||||
@@ -9,30 +9,29 @@ use indexer::segment_serializer::SegmentSerializer;
|
||||
use super::SegmentComponent;
|
||||
use core::Index;
|
||||
use std::result;
|
||||
use core::SegmentMeta;
|
||||
use directory::error::{FileError, OpenWriteError};
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
#[derive(Clone)]
|
||||
pub struct Segment {
|
||||
index: Index,
|
||||
segment_id: SegmentId,
|
||||
opstamp: u64,
|
||||
meta: SegmentMeta,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Segment {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Segment({:?})", self.segment_id.uuid_string())
|
||||
write!(f, "Segment({:?})", self.id().uuid_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
pub fn create_segment(index: Index, segment_id: SegmentId, opstamp: u64) -> Segment {
|
||||
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment {
|
||||
index: index,
|
||||
segment_id: segment_id,
|
||||
opstamp: opstamp,
|
||||
meta: meta,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,20 +42,21 @@ impl Segment {
|
||||
self.index.schema()
|
||||
}
|
||||
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
self.opstamp
|
||||
pub fn meta(&self,) -> &SegmentMeta {
|
||||
&self.meta
|
||||
}
|
||||
|
||||
/// Returns the segment's id.
|
||||
pub fn id(&self,) -> SegmentId {
|
||||
self.segment_id
|
||||
self.meta.segment_id
|
||||
}
|
||||
|
||||
pub fn with_opstamp(&self, opstamp: u64) -> Segment {
|
||||
pub fn with_delete_opstamp(self, opstamp: u64) -> Segment {
|
||||
let mut meta = self.meta;
|
||||
meta.delete_opstamp = Some(opstamp);
|
||||
Segment {
|
||||
index: self.index.clone(),
|
||||
segment_id: self.segment_id.clone(),
|
||||
opstamp: opstamp,
|
||||
index: self.index,
|
||||
meta: meta,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ impl Segment {
|
||||
/// associated to a segment component.
|
||||
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
||||
use self::SegmentComponent::*;
|
||||
let mut path = self.segment_id.uuid_string();
|
||||
let mut path = self.id().uuid_string();
|
||||
path.push_str(&*match component {
|
||||
POSITIONS => ".pos".to_string(),
|
||||
INFO => ".info".to_string(),
|
||||
@@ -75,7 +75,7 @@ impl Segment {
|
||||
STORE => ".store".to_string(),
|
||||
FASTFIELDS => ".fast".to_string(),
|
||||
FIELDNORMS => ".fieldnorm".to_string(),
|
||||
DELETE => {format!(".{}.del", self.opstamp)},
|
||||
DELETE => {format!(".{}.del", self.meta.delete_opstamp.unwrap_or(0))},
|
||||
});
|
||||
PathBuf::from(path)
|
||||
}
|
||||
|
||||
@@ -1,21 +1,23 @@
|
||||
use core::SegmentId;
|
||||
|
||||
|
||||
// TODO Option<DeleteMeta>
|
||||
|
||||
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
|
||||
pub struct SegmentMeta {
|
||||
pub segment_id: SegmentId,
|
||||
pub num_docs: u32,
|
||||
pub num_deleted_docs: u32,
|
||||
pub opstamp: u64,
|
||||
pub delete_opstamp: Option<u64>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl SegmentMeta {
|
||||
pub fn new(segment_id: SegmentId, num_docs: u32) -> SegmentMeta {
|
||||
pub fn new(segment_id: SegmentId) -> SegmentMeta {
|
||||
SegmentMeta {
|
||||
segment_id: segment_id,
|
||||
num_docs: num_docs,
|
||||
num_docs: 0,
|
||||
num_deleted_docs: 0,
|
||||
opstamp: 0u64,
|
||||
delete_opstamp: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,14 +147,15 @@ impl SegmentReader {
|
||||
.unwrap_or_else(|_| ReadOnlySource::empty());
|
||||
|
||||
// TODO 0u64
|
||||
let delete_data_res = segment.open_read(SegmentComponent::DELETE);
|
||||
let delete_bitset;
|
||||
if let Err(FileError::FileDoesNotExist(_)) = delete_data_res {
|
||||
delete_bitset = DeleteBitSet::empty();
|
||||
}
|
||||
else {
|
||||
delete_bitset = DeleteBitSet::open(delete_data_res?);
|
||||
}
|
||||
let delete_bitset =
|
||||
if segment.meta().delete_opstamp.is_some() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
DeleteBitSet::open(delete_data)
|
||||
}
|
||||
else {
|
||||
DeleteBitSet::empty()
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
Ok(SegmentReader {
|
||||
segment_info: segment_info,
|
||||
|
||||
@@ -116,16 +116,11 @@ impl DocToOpstampMapping {
|
||||
pub fn advance_deletes(
|
||||
segment: &Segment,
|
||||
delete_cursor: &mut DeleteQueueCursor,
|
||||
doc_opstamps: DocToOpstampMapping) -> Result<(u64, BitSet)> {
|
||||
doc_opstamps: DocToOpstampMapping) -> Result<Option<(u64, BitSet)>> {
|
||||
let segment_reader = SegmentReader::open(segment.clone())?;
|
||||
let mut delete_bitset = BitSet::new();
|
||||
for doc in 0u32..segment_reader.max_doc() {
|
||||
if segment_reader.is_deleted(doc) {
|
||||
delete_bitset.insert(doc as usize);
|
||||
}
|
||||
}
|
||||
let mut has_changed = false;
|
||||
let mut last_opstamp = segment.opstamp();//segment
|
||||
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
|
||||
|
||||
let mut last_opstamp_opt: Option<u64> = None;
|
||||
for delete_op in delete_cursor {
|
||||
// A delete operation should only affect
|
||||
// document that were inserted after it.
|
||||
@@ -135,17 +130,26 @@ pub fn advance_deletes(
|
||||
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
|
||||
if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
|
||||
while docset.advance() {
|
||||
has_changed = true;
|
||||
let deleted_doc = docset.doc();
|
||||
if deleted_doc < limit_doc {
|
||||
has_changed = true;
|
||||
delete_bitset.insert(deleted_doc as usize);
|
||||
}
|
||||
}
|
||||
last_opstamp_opt = Some(delete_op.opstamp);
|
||||
}
|
||||
last_opstamp = delete_op.opstamp;
|
||||
}
|
||||
Ok((last_opstamp, delete_bitset))
|
||||
|
||||
if let Some(last_opstamp) = last_opstamp_opt {
|
||||
for doc in 0u32..segment_reader.max_doc() {
|
||||
if segment_reader.is_deleted(doc) {
|
||||
delete_bitset.insert(doc as usize);
|
||||
}
|
||||
}
|
||||
Ok(Some((last_opstamp, delete_bitset)))
|
||||
}
|
||||
else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn index_documents(heap: &mut Heap,
|
||||
@@ -175,20 +179,25 @@ fn index_documents(heap: &mut Heap,
|
||||
|
||||
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
|
||||
|
||||
let (last_opstamp_after_deletes, deleted_docset) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?;
|
||||
|
||||
{
|
||||
let mut delete_file = segment.with_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&deleted_docset, &mut delete_file)?;
|
||||
}
|
||||
let num_deleted_docs = deleted_docset.len() as DocId;
|
||||
|
||||
let segment_meta = SegmentMeta {
|
||||
segment_id: segment_id,
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: num_deleted_docs,
|
||||
opstamp: last_opstamp_after_deletes,
|
||||
};
|
||||
let segment_meta =
|
||||
if let Some((last_opstamp_after_deletes, deleted_docset)) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))? {
|
||||
let mut delete_file = segment.with_delete_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&deleted_docset, &mut delete_file)?;
|
||||
SegmentMeta {
|
||||
segment_id: segment_id,
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: deleted_docset.len() as DocId,
|
||||
delete_opstamp: Some(last_opstamp_after_deletes),
|
||||
}
|
||||
}
|
||||
else {
|
||||
SegmentMeta {
|
||||
segment_id: segment_id,
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: 0,
|
||||
delete_opstamp: None,
|
||||
}
|
||||
};
|
||||
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
|
||||
|
||||
@@ -252,6 +261,7 @@ impl IndexWriter {
|
||||
let mut document_iterator = document_receiver_clone.clone()
|
||||
.into_iter()
|
||||
.peekable();
|
||||
|
||||
// the peeking here is to avoid
|
||||
// creating a new segment's files
|
||||
// if no document are available.
|
||||
@@ -269,8 +279,8 @@ impl IndexWriter {
|
||||
// was dropped.
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let segment = index.new_segment(opstamp);
|
||||
|
||||
let segment = index.new_segment();
|
||||
let valid_generation = index_documents(&mut heap,
|
||||
segment,
|
||||
&schema,
|
||||
|
||||
@@ -120,11 +120,20 @@ mod tests {
|
||||
assert!(result_list.is_empty());
|
||||
}
|
||||
|
||||
fn seg_meta(num_docs: u32) -> SegmentMeta {
|
||||
SegmentMeta {
|
||||
segment_id: SegmentId::generate_random(),
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: 0u32,
|
||||
delete_opstamp: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_merge_policy_pair() {
|
||||
let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 10)];
|
||||
let test_input = vec![seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(10)];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 1);
|
||||
}
|
||||
@@ -132,12 +141,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_log_merge_policy_levels() {
|
||||
// multiple levels all get merged correctly
|
||||
let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000)];
|
||||
let test_input = vec![seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(10),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000)];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 2);
|
||||
}
|
||||
@@ -145,24 +154,24 @@ mod tests {
|
||||
#[test]
|
||||
fn test_log_merge_policy_within_levels() {
|
||||
// multiple levels all get merged correctly
|
||||
let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 11),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 12),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1000)];
|
||||
let test_input = vec![seg_meta(10),
|
||||
seg_meta(11),
|
||||
seg_meta(12),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000),
|
||||
seg_meta(1000)];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 2);
|
||||
}
|
||||
#[test]
|
||||
fn test_log_merge_policy_small_segments() {
|
||||
// multiple levels all get merged correctly
|
||||
let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 1),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 1),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 2),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 2),
|
||||
SegmentMeta::new(SegmentId::generate_random(), 2)];
|
||||
let test_input = vec![seg_meta(1),
|
||||
seg_meta(1),
|
||||
seg_meta(1),
|
||||
seg_meta(2),
|
||||
seg_meta(2),
|
||||
seg_meta(2)];
|
||||
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
|
||||
assert_eq!(result_list.len(), 1);
|
||||
}
|
||||
|
||||
@@ -195,14 +195,14 @@ mod tests {
|
||||
let segment_id_merged = SegmentId::generate_random();
|
||||
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_a, 10);
|
||||
let segment_meta = SegmentMeta::new(segment_id_a);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready);
|
||||
assert_eq!(segment_register.segment_ids(), vec!(segment_id_a));
|
||||
{
|
||||
let segment_meta = SegmentMeta::new(segment_id_b, 20);
|
||||
let segment_meta = SegmentMeta::new(segment_id_b);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
@@ -214,7 +214,7 @@ mod tests {
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 10 + 20);
|
||||
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
|
||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor());
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
|
||||
@@ -177,13 +177,13 @@ impl SegmentUpdater {
|
||||
fn purge_deletes(&self, target_opstamp: u64) -> Result<()> {
|
||||
let uncommitted = self.0.segment_manager.segment_entries();
|
||||
for mut segment_entry in uncommitted {
|
||||
let mut segment = self.0.index.segment(segment_entry.meta());
|
||||
let (_, deleted_docset) = advance_deletes(
|
||||
let mut segment = self.0.index.segment(segment_entry.meta().clone());
|
||||
if let Some((_, deleted_docset)) = advance_deletes(
|
||||
&segment,
|
||||
segment_entry.delete_cursor(),
|
||||
DocToOpstampMapping::None).unwrap();
|
||||
{
|
||||
let mut delete_file = segment.with_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?;
|
||||
DocToOpstampMapping::None).unwrap()
|
||||
{
|
||||
let mut delete_file = segment.with_delete_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&deleted_docset, &mut delete_file)?;
|
||||
}
|
||||
}
|
||||
@@ -237,20 +237,16 @@ impl SegmentUpdater {
|
||||
|
||||
let segments: Vec<Segment> = segment_metas
|
||||
.iter()
|
||||
.map(|ref segment_meta| index.segment(segment_meta))
|
||||
.cloned()
|
||||
.map(|segment_meta| index.segment(segment_meta))
|
||||
.collect();
|
||||
|
||||
// An IndexMerger is like a "view" of our merged segments.
|
||||
// TODO unwrap
|
||||
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
|
||||
|
||||
let opstamp = segment_metas
|
||||
.iter()
|
||||
.map(|meta| meta.opstamp)
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
let mut merged_segment = index.new_segment(opstamp);
|
||||
let mut merged_segment = index.new_segment();
|
||||
|
||||
// ... we just serialize this index merger in our new segment
|
||||
// to merge the two segments.
|
||||
@@ -260,7 +256,7 @@ impl SegmentUpdater {
|
||||
segment_id: merged_segment.id(),
|
||||
num_docs: num_docs,
|
||||
num_deleted_docs: 0u32,
|
||||
opstamp: opstamp,
|
||||
delete_opstamp: None, // TODO fix delete_opstamp
|
||||
};
|
||||
|
||||
// TODO fix delete cursor
|
||||
|
||||
@@ -61,7 +61,7 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut segment = index.new_segment(0u64);
|
||||
let mut segment = index.new_segment();
|
||||
let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap();
|
||||
let term = Term::from_field_text(text_field, "abc");
|
||||
posting_serializer.new_term(&term, 3).unwrap();
|
||||
@@ -81,7 +81,7 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let segment = index.new_segment(0u64);
|
||||
let segment = index.new_segment();
|
||||
let heap = Heap::with_capacity(10_000_000);
|
||||
{
|
||||
let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user