From 2ba6a12ddcf3f976913ca94d777082e0c2c6feca Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 31 Dec 2019 15:03:28 +0900 Subject: [PATCH] segment registers --- src/indexer/index_writer.rs | 24 +++++++++++++++++------ src/indexer/segment_manager.rs | 35 ++++++++++++++-------------------- src/indexer/segment_updater.rs | 8 +++----- 3 files changed, 35 insertions(+), 32 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 50d80bb3b..49eac0cde 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -16,6 +16,8 @@ use crate::fastfield::write_delete_bitset; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping; use crate::indexer::operation::DeleteOperation; +use crate::indexer::segment_manager::SegmentRegisters; +use crate::indexer::segment_register::SegmentRegister; use crate::indexer::stamper::Stamper; use crate::indexer::MergePolicy; use crate::indexer::SegmentEntry; @@ -32,7 +34,7 @@ use smallvec::smallvec; use smallvec::SmallVec; use std::mem; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread; use std::thread::JoinHandle; @@ -69,6 +71,8 @@ pub struct IndexWriter { // lifetime of the lock with that of the IndexWriter. _directory_lock: Option, + segment_registers: Arc>, + index: Index, heap_size_in_bytes_per_thread: usize, @@ -305,16 +309,24 @@ impl IndexWriter { let delete_queue = DeleteQueue::new(); - let current_opstamp = index.load_metas()?.opstamp; + let meta = index.load_metas()?; - let stamper = Stamper::new(current_opstamp); + let stamper = Stamper::new(meta.opstamp); + + let commited_segments = SegmentRegister::new( + index.directory(), + &index.schema(), + meta.segments, + &delete_queue.cursor(), + ); + let segment_registers = Arc::new(RwLock::new(SegmentRegisters::new(commited_segments))); let segment_updater = - SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + SegmentUpdater::create(segment_registers.clone(), index.clone(), stamper.clone())?; let mut index_writer = IndexWriter { _directory_lock: Some(directory_lock), - + segment_registers, heap_size_in_bytes_per_thread, index: index.clone(), @@ -328,7 +340,7 @@ impl IndexWriter { delete_queue, - committed_opstamp: current_opstamp, + committed_opstamp: meta.opstamp, stamper, worker_id: 0, diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index b66ec9502..8344e472e 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -1,19 +1,26 @@ use super::segment_register::SegmentRegister; use crate::core::SegmentId; use crate::core::SegmentMeta; -use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::SegmentEntry; -use crate::Index; use std::collections::hash_set::HashSet; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; #[derive(Default)] -struct SegmentRegisters { +pub(crate) struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, } +impl SegmentRegisters { + pub fn new(committed: SegmentRegister) -> SegmentRegisters { + SegmentRegisters { + uncommitted: Default::default(), + committed, + } + } +} + #[derive(PartialEq, Eq)] pub(crate) enum SegmentsStatus { Committed, @@ -43,7 +50,7 @@ impl SegmentRegisters { /// changes (merges especially) #[derive(Default)] pub struct SegmentManager { - registers: RwLock, + registers: Arc>, } pub fn get_mergeable_segments( @@ -62,22 +69,8 @@ pub fn get_mergeable_segments( } impl SegmentManager { - pub fn from_segments( - index: &Index, - segment_metas: Vec, - delete_cursor: &DeleteCursor, - ) -> SegmentManager { - SegmentManager { - registers: RwLock::new(SegmentRegisters { - uncommitted: SegmentRegister::default(), - committed: SegmentRegister::new( - index.directory(), - &index.schema(), - segment_metas, - delete_cursor, - ), - }), - } + pub(crate) fn new(registers: Arc>) -> SegmentManager { + SegmentManager { registers } } /// Returns all of the segment entries (committed or uncommitted) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 511664420..502b42c29 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,11 +7,10 @@ use crate::core::SegmentMeta; use crate::core::SerializableSegment; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; -use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; use crate::indexer::merger::IndexMerger; -use crate::indexer::segment_manager::SegmentsStatus; +use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus}; use crate::indexer::stamper::Stamper; use crate::indexer::SegmentEntry; use crate::indexer::SegmentSerializer; @@ -164,12 +163,11 @@ pub(crate) struct InnerSegmentUpdater { impl SegmentUpdater { pub fn create( + segment_registers: Arc>, index: Index, stamper: Stamper, - delete_cursor: &DeleteCursor, ) -> crate::Result { - let metas = index.searchable_segment_metas()?; - let segment_manager = SegmentManager::from_segments(&index, metas, delete_cursor); + let segment_manager = SegmentManager::new(segment_registers); let pool = ThreadPoolBuilder::new() .name_prefix("segment_updater") .pool_size(1)