segment registers

This commit is contained in:
Paul Masurel
2019-12-31 15:03:28 +09:00
parent bbbf95018b
commit 2ba6a12ddc
3 changed files with 35 additions and 32 deletions

View File

@@ -16,6 +16,8 @@ use crate::fastfield::write_delete_bitset;
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping;
use crate::indexer::operation::DeleteOperation;
use crate::indexer::segment_manager::SegmentRegisters;
use crate::indexer::segment_register::SegmentRegister;
use crate::indexer::stamper::Stamper;
use crate::indexer::MergePolicy;
use crate::indexer::SegmentEntry;
@@ -32,7 +34,7 @@ use smallvec::smallvec;
use smallvec::SmallVec;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::JoinHandle;
@@ -69,6 +71,8 @@ pub struct IndexWriter {
// lifetime of the lock with that of the IndexWriter.
_directory_lock: Option<DirectoryLock>,
segment_registers: Arc<RwLock<SegmentRegisters>>,
index: Index,
heap_size_in_bytes_per_thread: usize,
@@ -305,16 +309,24 @@ impl IndexWriter {
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let meta = index.load_metas()?;
let stamper = Stamper::new(current_opstamp);
let stamper = Stamper::new(meta.opstamp);
let commited_segments = SegmentRegister::new(
index.directory(),
&index.schema(),
meta.segments,
&delete_queue.cursor(),
);
let segment_registers = Arc::new(RwLock::new(SegmentRegisters::new(commited_segments)));
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
SegmentUpdater::create(segment_registers.clone(), index.clone(), stamper.clone())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
segment_registers,
heap_size_in_bytes_per_thread,
index: index.clone(),
@@ -328,7 +340,7 @@ impl IndexWriter {
delete_queue,
committed_opstamp: current_opstamp,
committed_opstamp: meta.opstamp,
stamper,
worker_id: 0,

View File

@@ -1,19 +1,26 @@
use super::segment_register::SegmentRegister;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::SegmentEntry;
use crate::Index;
use std::collections::hash_set::HashSet;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
#[derive(Default)]
struct SegmentRegisters {
pub(crate) struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
}
impl SegmentRegisters {
pub fn new(committed: SegmentRegister) -> SegmentRegisters {
SegmentRegisters {
uncommitted: Default::default(),
committed,
}
}
}
#[derive(PartialEq, Eq)]
pub(crate) enum SegmentsStatus {
Committed,
@@ -43,7 +50,7 @@ impl SegmentRegisters {
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
registers: Arc<RwLock<SegmentRegisters>>,
}
pub fn get_mergeable_segments(
@@ -62,22 +69,8 @@ pub fn get_mergeable_segments(
}
impl SegmentManager {
pub fn from_segments(
index: &Index,
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(
index.directory(),
&index.schema(),
segment_metas,
delete_cursor,
),
}),
}
pub(crate) fn new(registers: Arc<RwLock<SegmentRegisters>>) -> SegmentManager {
SegmentManager { registers }
}
/// Returns all of the segment entries (committed or uncommitted)

View File

@@ -7,11 +7,10 @@ use crate::core::SegmentMeta;
use crate::core::SerializableSegment;
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_manager::SegmentsStatus;
use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus};
use crate::indexer::stamper::Stamper;
use crate::indexer::SegmentEntry;
use crate::indexer::SegmentSerializer;
@@ -164,12 +163,11 @@ pub(crate) struct InnerSegmentUpdater {
impl SegmentUpdater {
pub fn create(
segment_registers: Arc<RwLock<SegmentRegisters>>,
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
) -> crate::Result<SegmentUpdater> {
let metas = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(&index, metas, delete_cursor);
let segment_manager = SegmentManager::new(segment_registers);
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)