Compare commits

...

16 Commits

Author SHA1 Message Date
Paul Masurel
4c846b1202 Added NRT directory kinda working 2019-04-05 10:07:29 +09:00
Paul Masurel
fac0013454 Added flush 2019-04-04 09:36:06 +09:00
Paul Masurel
95db5d9999 Merge branch 'master' into softcommits 2019-03-23 18:08:07 +09:00
Paul Masurel
7f0372fa97 reader 2019-02-16 16:09:16 +09:00
Paul Masurel
f8fdf68fcb unit test 2019-02-16 15:49:22 +09:00
Paul Masurel
c00e95cd04 Uncommited is not SegmentRegisters 2019-02-15 22:44:14 +09:00
Paul Masurel
a623d8f6d9 Added SegmentAvailable readonly view 2019-02-15 08:58:08 +09:00
Paul Masurel
b3ede2dd7e softcommits 2019-02-13 21:29:54 +09:00
Paul Masurel
b68686f040 opstamp constraint 2019-02-12 18:14:07 +09:00
Paul Masurel
629d3fb37f Added opstamp 2019-02-12 08:49:23 +09:00
Paul Masurel
f513f10e05 fmt 2019-02-08 15:04:35 +09:00
Paul Masurel
f262d4cc22 code cleaning 2019-02-08 14:54:34 +09:00
Paul Masurel
91e89714f4 Added soft commits 2019-02-08 14:42:52 +09:00
Paul Masurel
6fd3cb1254 Renaming 2019-02-06 05:48:15 +01:00
Paul Masurel
549b4e66e5 Using the new API 2019-02-06 00:17:56 +01:00
Paul Masurel
d9b2bf98e2 First stab 2019-02-05 21:23:07 +01:00
13 changed files with 550 additions and 113 deletions

View File

@@ -205,6 +205,16 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
/// `OnCommit` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
/// Ensure that all volatile files reach are persisted (in directory where that makes sense.)
///
/// In order to make Near Real Time efficient, tantivy introduced the notion of soft_commit vs
/// commit. Commit will call `.flush()`, while softcommit won't.
///
/// `meta.json` should be the last file to be flushed.
fn flush(&self) -> io::Result<()> {
Ok(())
}
}
/// DirectoryClone

View File

@@ -368,7 +368,7 @@ impl Drop for ReleaseLockFile {
/// This Write wraps a File, but has the specificity of
/// call `sync_all` on flush.
struct SafeFileWriter(File);
pub struct SafeFileWriter(File);
impl SafeFileWriter {
fn new(file: File) -> SafeFileWriter {

View File

@@ -13,6 +13,7 @@ mod managed_directory;
mod ram_directory;
mod read_only_source;
mod watch_event_router;
mod nrt_directory;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -0,0 +1,195 @@
use directory::Directory;
use std::path::{PathBuf, Path};
use directory::ReadOnlySource;
use directory::error::OpenReadError;
use directory::error::DeleteError;
use std::io::{BufWriter, Cursor};
use directory::SeekableWrite;
use directory::error::OpenWriteError;
use directory::WatchHandle;
use directory::ram_directory::InnerRamDirectory;
use std::sync::RwLock;
use std::sync::Arc;
use directory::WatchCallback;
use std::fmt;
use std::io;
use std::io::{Seek, Write};
use directory::DirectoryClone;
const BUFFER_LEN: usize = 1_000_000;
pub enum NRTWriter {
InRam {
buffer: Cursor<Vec<u8>>,
path: PathBuf,
nrt_directory: NRTDirectory
},
UnderlyingFile(BufWriter<Box<SeekableWrite>>)
}
impl NRTWriter {
pub fn new(path: PathBuf, nrt_directory: NRTDirectory) -> NRTWriter {
NRTWriter::InRam {
buffer: Cursor::new(Vec::with_capacity(BUFFER_LEN)),
path,
nrt_directory,
}
}
}
impl io::Seek for NRTWriter {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
buffer.seek(pos)
}
NRTWriter::UnderlyingFile(file) => {
file.seek(pos)
}
}
}
}
impl io::Write for NRTWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_all(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
let mut cache_wlock = nrt_directory.cache.write().unwrap();
cache_wlock.write(path.clone(), buffer.get_ref());
Ok(())
}
NRTWriter::UnderlyingFile(file) => {
file.flush()
}
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
// Working around the borrow checker.
let mut underlying_write_opt: Option<BufWriter<Box<SeekableWrite>>> = None;
if let NRTWriter::InRam { buffer, path, nrt_directory } = self {
if buffer.get_ref().len() + buf.len() > BUFFER_LEN {
// We can't keep this in RAM. Let's move it to the underlying directory.
underlying_write_opt = Some(nrt_directory.open_write(path)
.map_err(|open_err| {
io::Error::new(io::ErrorKind::Other, open_err)
})?);
}
}
if let Some(underlying_write) = underlying_write_opt {
*self = NRTWriter::UnderlyingFile(underlying_write);
}
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
assert!(buffer.get_ref().len() + buf.len() <= BUFFER_LEN);
buffer.write_all(buf)
}
NRTWriter::UnderlyingFile(file) => {
file.write_all(buf)
}
}
}
}
pub struct NRTDirectory {
underlying: Box<Directory>,
cache: Arc<RwLock<InnerRamDirectory>>,
}
impl Clone for NRTDirectory {
fn clone(&self) -> Self {
NRTDirectory {
underlying: self.underlying.box_clone(),
cache: self.cache.clone()
}
}
}
impl NRTDirectory {
fn wrap(underlying: Box<Directory>) -> NRTDirectory {
NRTDirectory {
underlying,
cache: Default::default()
}
}
}
impl fmt::Debug for NRTDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NRTDirectory({:?})", self.underlying)
}
}
impl Directory for NRTDirectory {
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
unimplemented!()
}
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
// We explicitly release the lock, to prevent a panic on the underlying directory
// to poison the lock.
//
// File can only go from cache to underlying so the result does not lead to
// any inconsistency.
{
let mut cache_wlock = self.cache.write().unwrap();
if cache_wlock.exists(path) {
return cache_wlock.delete(path);
}
}
self.underlying.delete(path)
}
fn exists(&self, path: &Path) -> bool {
// We explicitly release the lock, to prevent a panic on the underlying directory
// to poison the lock.
//
// File can only go from cache to underlying so the result does not lead to
// any inconsistency.
{
let rlock_cache = self.cache.read().unwrap();
if rlock_cache.exists(path) {
return true;
}
}
self.underlying.exists(path)
}
fn open_write(&mut self, path: &Path) -> Result<BufWriter<Box<SeekableWrite>>, OpenWriteError> {
let mut cache_wlock = self.cache.write().unwrap();
// TODO might poison our lock. I don't know have a sound solution yet.
let path_buf = path.to_owned();
if self.underlying.exists(path) {
return Err(OpenWriteError::FileAlreadyExists(path_buf));
}
let exists = cache_wlock.write(path_buf.clone(), &[]);
// force the creation of the file to mimic the MMap directory.
if exists {
Err(OpenWriteError::FileAlreadyExists(path_buf))
} else {
let vec_writer = NRTWriter::new(path_buf.clone(), self.clone());
Ok(BufWriter::new(Box::new(vec_writer)))
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
self.underlying.atomic_read(path)
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
self.underlying.atomic_write(path, data)
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
self.underlying.watch(watch_callback)
}
}

View File

@@ -71,36 +71,36 @@ impl Write for VecWriter {
}
#[derive(Default)]
struct InnerDirectory {
pub(crate) struct InnerRamDirectory {
fs: HashMap<PathBuf, ReadOnlySource>,
watch_router: WatchCallbackList,
}
impl InnerDirectory {
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
impl InnerRamDirectory {
pub fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
let data = ReadOnlySource::new(Vec::from(data));
self.fs.insert(path, data).is_some()
}
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
pub fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
self.fs
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(|el| el.clone())
}
fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
pub fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
match self.fs.remove(path) {
Some(_) => Ok(()),
None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
}
}
fn exists(&self, path: &Path) -> bool {
pub fn exists(&self, path: &Path) -> bool {
self.fs.contains_key(path)
}
fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
pub fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
self.watch_router.subscribe(watch_handle)
}
}
@@ -118,7 +118,7 @@ impl fmt::Debug for RAMDirectory {
///
#[derive(Clone, Default)]
pub struct RAMDirectory {
fs: Arc<RwLock<InnerDirectory>>,
fs: Arc<RwLock<InnerRamDirectory>>,
}
impl RAMDirectory {

View File

@@ -179,6 +179,11 @@ pub struct DeleteCursor {
}
impl DeleteCursor {
pub fn empty() -> DeleteCursor {
DeleteQueue::new().cursor()
}
/// Skips operations and position it so that
/// - either all of the delete operation currently in the
/// queue are consume and the next get will return None.

View File

@@ -259,7 +259,7 @@ pub fn advance_deletes(
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
segment_entry.set_meta(target_opstamp, segment.meta().clone());
Ok(())
}
@@ -326,7 +326,12 @@ fn index_documents(
// to even open the segment.
None
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
let segment_entry = SegmentEntry::new(
segment_meta,
delete_cursor,
delete_bitset_opt,
last_docstamp,
);
Ok(segment_updater.add_segment(generation, segment_entry))
}
@@ -361,9 +366,9 @@ impl IndexWriter {
}
#[doc(hidden)]
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
pub fn add_segment(&mut self, segment_meta: SegmentMeta, opstamp: u64) {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None, opstamp);
self.segment_updater
.add_segment(self.generation, segment_entry);
}
@@ -527,7 +532,7 @@ impl IndexWriter {
//
// This will reach an end as the only document_sender
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
for _ in document_receiver.iter() {}
Ok(())
}
@@ -554,6 +559,16 @@ impl IndexWriter {
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
info!("Preparing commit");
self.prepare_commit_internal(false)
}
pub fn prepare_commit_soft(&mut self) -> Result<PreparedCommit> {
info!("Preparing soft commit");
self.prepare_commit_internal(true)
}
pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -576,13 +591,13 @@ impl IndexWriter {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
// add a new worker for the next generation.
// add a new worker for the next generation, whether the worker failed or not.
self.add_indexing_worker()?;
indexing_worker_result?;
}
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft);
info!("Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}
@@ -605,6 +620,11 @@ impl IndexWriter {
self.prepare_commit()?.commit()
}
pub fn soft_commit(&mut self) -> Result<u64> {
self.prepare_commit_soft()?.commit()
}
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
&self.segment_updater
}
@@ -732,6 +752,7 @@ mod tests {
use Index;
use ReloadPolicy;
use Term;
use IndexReader;
#[test]
fn test_operations_group() {
@@ -865,6 +886,13 @@ mod tests {
let _index_writer_two = index.writer(3_000_000).unwrap();
}
fn num_docs_containing_text(reader: &IndexReader, term: &str) -> u64 {
let searcher = reader.searcher();
let text_field = reader.schema().get_field("text").unwrap();
let term = Term::from_field_text(text_field, term);
searcher.doc_freq(&term)
}
#[test]
fn test_commit_and_rollback() {
let mut schema_builder = schema::Schema::builder();
@@ -881,9 +909,12 @@ mod tests {
searcher.doc_freq(&term)
};
let mut index_writer = index.writer(3_000_000).unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing_text(&reader, "a"), 0);
{
// writing the segment
let mut index_writer = index.writer(3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.rollback().unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
@@ -902,6 +933,35 @@ mod tests {
reader.searcher();
}
#[test]
fn test_softcommit_and_rollback() {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader().unwrap();
// writing the segment
let mut index_writer = index.writer(3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.rollback().unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
{
index_writer.add_document(doc!(text_field=>"b"));
index_writer.add_document(doc!(text_field=>"c"));
}
assert!(index_writer.soft_commit().is_ok());
reader.reload().unwrap(); // we need to load soft committed stuff.
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "b"), 1u64);
assert_eq!(num_docs_containing_text(&reader, "c"), 1u64);
index_writer.rollback().unwrap();
reader.reload().unwrap();
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "b"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "c"), 0u64);
}
#[test]
fn test_with_merges() {
let mut schema_builder = schema::Schema::builder();
@@ -935,7 +995,7 @@ mod tests {
reader.reload().unwrap();
assert_eq!(num_docs_containing("a"), 200);
assert_eq!(num_docs_containing_text(&reader, "a"), 200);
assert!(index.searchable_segments().unwrap().len() < 8);
}
}
@@ -978,7 +1038,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();

View File

@@ -6,14 +6,20 @@ pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: u64,
soft: bool,
}
impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
pub(crate) fn new(
index_writer: &'a mut IndexWriter,
opstamp: u64,
soft: bool,
) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
opstamp,
soft,
}
}
@@ -33,7 +39,7 @@ impl<'a> PreparedCommit<'a> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
.commit(self.opstamp, self.payload, self.soft)?;
Ok(self.opstamp)
}
}

View File

@@ -22,6 +22,7 @@ pub struct SegmentEntry {
meta: SegmentMeta,
delete_bitset: Option<BitSet>,
delete_cursor: DeleteCursor,
opstamp: u64,
}
impl SegmentEntry {
@@ -30,14 +31,20 @@ impl SegmentEntry {
segment_meta: SegmentMeta,
delete_cursor: DeleteCursor,
delete_bitset: Option<BitSet>,
opstamp: u64,
) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
delete_bitset,
delete_cursor,
opstamp,
}
}
pub fn opstamp(&self) -> u64 {
self.opstamp
}
/// Return a reference to the segment entry deleted bitset.
///
/// `DocId` in this bitset are flagged as deleted.
@@ -46,7 +53,8 @@ impl SegmentEntry {
}
/// Set the `SegmentMeta` for this segment.
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
pub fn set_meta(&mut self, opstamp: u64, segment_meta: SegmentMeta) {
self.opstamp = opstamp;
self.meta = segment_meta;
}

View File

@@ -11,11 +11,47 @@ use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use Result as TantivyResult;
use std::sync::Arc;
use std::collections::HashMap;
/// Provides a read-only view of the available segments.
#[derive(Clone)]
pub struct AvailableSegments {
registers: Arc<RwLock<SegmentRegisters>>,
}
impl AvailableSegments {
pub fn committed(&self) -> Vec<SegmentMeta> {
self.registers
.read()
.unwrap()
.committed
.segment_metas()
}
pub fn soft_committed(&self) -> Vec<SegmentMeta> {
self.registers
.read()
.unwrap()
.soft_committed
.segment_metas()
}
}
#[derive(Default)]
struct SegmentRegisters {
uncommitted: SegmentRegister,
uncommitted: HashMap<SegmentId, SegmentEntry>,
committed: SegmentRegister,
/// soft commits can advance committed segment to a future delete
/// opstamp.
///
/// In that case the same `SegmentId` can appear in both `committed`
/// and in `committed_in_the_future`.
///
/// We do not consider these segments for merges.
soft_committed: SegmentRegister,
/// `DeleteCursor`, positionned on the soft commit.
delete_cursor: DeleteCursor,
}
/// The segment manager stores the list of segments
@@ -23,9 +59,8 @@ struct SegmentRegisters {
///
/// It guarantees the atomicity of the
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
registers: Arc<RwLock<SegmentRegisters>>
}
impl Debug for SegmentManager {
@@ -46,11 +81,17 @@ pub fn get_mergeable_segments(
let registers_lock = segment_manager.read();
(
registers_lock
.committed
.soft_committed
.get_mergeable_segments(in_merge_segment_ids),
registers_lock
.uncommitted
.get_mergeable_segments(in_merge_segment_ids),
.values()
.map(|segment_entry| segment_entry.meta())
.filter(|segment_meta| {
!in_merge_segment_ids.contains(&segment_meta.id())
})
.cloned()
.collect::<Vec<_>>()
)
}
@@ -58,21 +99,22 @@ impl SegmentManager {
pub fn from_segments(
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
opstamp: u64,
) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
}),
registers: Arc::new(RwLock::new(SegmentRegisters {
uncommitted: HashMap::default(),
committed: SegmentRegister::new(segment_metas.clone(), opstamp),
soft_committed: SegmentRegister::new(segment_metas, opstamp),
delete_cursor: delete_cursor.clone(),
}))
}
}
/// Returns all of the segment entries (committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let registers_lock = self.read();
let mut segment_entries = registers_lock.uncommitted.segment_entries();
segment_entries.extend(registers_lock.committed.segment_entries());
segment_entries
pub fn available_segments_view(&self) -> AvailableSegments {
AvailableSegments {
registers: self.registers.clone()
}
}
/// List the files that are useful to the index.
@@ -108,44 +150,76 @@ impl SegmentManager {
let mut registers_lock = self.write();
registers_lock
.committed
.segment_entries()
.segment_metas()
.iter()
.filter(|segment| segment.meta().num_docs() == 0)
.for_each(|segment| {
.filter(|segment_meta| segment_meta.num_docs() == 0)
.for_each(|segment_meta| {
registers_lock
.committed
.remove_segment(&segment.segment_id())
.remove_segment(&segment_meta.id())
});
registers_lock
.soft_committed
.segment_metas()
.iter()
.filter(|segment_meta| segment_meta.num_docs() == 0)
.for_each(|segment_meta| {
registers_lock
.committed
.remove_segment(&segment_meta.id())
});
}
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
/// Returns all of the segment entries (soft committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let registers_lock = self.read();
let mut segment_entries: Vec<SegmentEntry > = registers_lock.uncommitted.values().cloned().collect();
segment_entries.extend(registers_lock.soft_committed.segment_entries(&registers_lock.delete_cursor).into_iter());
segment_entries
}
/// Marks a list of segments as in merge.
pub fn commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.uncommitted.clear();
registers_lock
.committed
.set_commit(opstamp, segment_entries.clone());
registers_lock
.soft_committed
.set_commit(opstamp, segment_entries);
registers_lock.delete_cursor.skip_to(opstamp);
}
pub fn soft_commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.uncommitted.clear();
registers_lock
.soft_committed
.set_commit(opstamp, segment_entries);
registers_lock.delete_cursor.skip_to(opstamp);
}
/// Gets the list of segment_entries associated to a list of `segment_ids`.
/// This method is used when starting a merge operations.
///
/// Returns an error if some segments are missing, or if
/// the `segment_ids` are not either all committed or all
/// the `segment_ids` are not either all soft_committed or all
/// uncommitted.
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
let registers_lock = self.read();
let mut segment_entries = vec![];
if registers_lock.uncommitted.contains_all(segment_ids) {
if segment_ids.iter().all(|segment_id| registers_lock.uncommitted.contains_key(segment_id)) {
for segment_id in segment_ids {
let segment_entry = registers_lock.uncommitted
.get(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
segment_entries.push(segment_entry.clone());
}
} else if registers_lock.committed.contains_all(segment_ids) {
} else if registers_lock.soft_committed.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.committed
.get(segment_id)
let segment_entry = registers_lock.soft_committed
.get(segment_id, &registers_lock.delete_cursor)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
@@ -160,35 +234,32 @@ impl SegmentManager {
pub fn add_segment(&self, segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
registers_lock.uncommitted.add_segment_entry(segment_entry);
registers_lock
.uncommitted
.insert(segment_entry.segment_id(), segment_entry);
}
pub fn end_merge(
&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentEntry,
after_merge_segment_entry: SegmentEntry
) {
let mut registers_lock = self.write();
let target_register: &mut SegmentRegister = {
if registers_lock
if before_merge_segment_ids.iter().all(|seg_id|
registers_lock
.uncommitted
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.uncommitted
} else if registers_lock
.committed
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.committed
} else {
warn!("couldn't find segment in SegmentManager");
return;
.contains_key(seg_id))
{
for segment_id in before_merge_segment_ids {
registers_lock.uncommitted.remove(&segment_id);
}
};
for segment_id in before_merge_segment_ids {
target_register.remove_segment(segment_id);
registers_lock.uncommitted.insert(after_merge_segment_entry.segment_id(),
after_merge_segment_entry);
} else {
registers_lock.committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry);
registers_lock.soft_committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry)
}
target_register.add_segment_entry(after_merge_segment_entry);
}
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {

View File

@@ -16,7 +16,8 @@ use std::fmt::{self, Debug, Formatter};
/// merge candidates.
#[derive(Default)]
pub struct SegmentRegister {
segment_states: HashMap<SegmentId, SegmentEntry>,
segment_states: HashMap<SegmentId, SegmentMeta>,
opstamp_constraint: u64,
}
impl Debug for SegmentRegister {
@@ -41,23 +42,28 @@ impl SegmentRegister {
) -> Vec<SegmentMeta> {
self.segment_states
.values()
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
.map(|segment_entry| segment_entry.meta().clone())
.filter(|segment_meta| !in_merge_segment_ids.contains(&segment_meta.id()))
.cloned()
.collect()
}
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
self.segment_states.values().cloned().collect()
}
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self
let mut segment_metas: Vec<SegmentMeta> = self
.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.cloned()
.collect();
segment_ids.sort_by_key(|meta| meta.id());
segment_ids
segment_metas.sort_by_key(|meta| meta.id());
segment_metas
}
pub fn segment_entries(&self, delete_cursor: &DeleteCursor) -> Vec<SegmentEntry> {
self.segment_states
.values()
.map(|segment_meta| {
SegmentEntry::new(segment_meta.clone(), delete_cursor.clone(), None, self.opstamp_constraint)
})
.collect()
}
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
@@ -66,27 +72,77 @@ impl SegmentRegister {
.all(|segment_id| self.segment_states.contains_key(segment_id))
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
pub fn receive_merge(&mut self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: &SegmentEntry) {
if after_merge_segment_entry.opstamp() != self.opstamp_constraint {
return;
}
if !self.contains_all(before_merge_segment_ids) {
return;
}
for segment_id in before_merge_segment_ids {
self.segment_states.remove(segment_id);
}
self.register_segment_entry(after_merge_segment_entry.clone());
}
/// Registers a `SegmentEntry`.
///
/// If a segment entry associated to this `SegmentId` is already there,
/// override it with the new `SegmentEntry`.
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
if self.opstamp_constraint != segment_entry.opstamp() {
panic!(format!(
"Invalid segment. Expect opstamp {}, got {}.",
self.opstamp_constraint,
segment_entry.opstamp()
));
}
if segment_entry.meta().num_docs() == 0 {
return;
}
let segment_id = segment_entry.segment_id();
self.segment_states.insert(segment_id, segment_entry);
// Check that we are ok with deletes.
self.segment_states.insert(segment_id, segment_entry.meta().clone());
}
pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
self.segment_states.clear();
self.opstamp_constraint = opstamp;
for segment_entry in segment_entries {
self.register_segment_entry(segment_entry);
}
}
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
self.segment_states.remove(segment_id);
self.segment_states.remove(&segment_id);
}
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
pub fn get(&self, segment_id: &SegmentId, delete_cursor: &DeleteCursor) -> Option<SegmentEntry> {
self.segment_states
.get(&segment_id)
.map(|segment_meta|
SegmentEntry::new(
segment_meta.clone(),
delete_cursor.clone(),
None,
self.opstamp_constraint
))
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
pub fn new(
segment_metas: Vec<SegmentMeta>,
opstamp: u64,
) -> SegmentRegister {
let mut segment_states = HashMap::new();
for segment_meta in segment_metas {
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
segment_states.insert(segment_id, segment_entry);
segment_states.insert(segment_meta.id(), segment_meta);
}
SegmentRegister {
segment_states,
opstamp_constraint: opstamp,
}
SegmentRegister { segment_states }
}
}
@@ -115,22 +171,22 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta = SegmentMeta::new(segment_id_a, 1u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta = SegmentMeta::new(segment_id_b, 2u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
segment_register.register_segment_entry(segment_entry);
}
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 3u32);
let segment_entry =
SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None, 0u64);
segment_register.receive_merge(&[segment_id_a, segment_id_b], &segment_entry);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
}

View File

@@ -125,7 +125,7 @@ fn perform_merge(
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None, target_opstamp);
Ok(after_merge_segment_entry)
}
@@ -155,8 +155,11 @@ impl SegmentUpdater {
stamper: Stamper,
delete_cursor: &DeleteCursor,
) -> Result<SegmentUpdater> {
let index_meta = index.load_metas()?;
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let opstamp = index_meta.opstamp;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor, opstamp);
let pool = CpuPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
@@ -280,14 +283,30 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: u64, payload: Option<String>, soft: bool) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
if soft {
// Soft commit.
//
// The list `segment_entries` above is what we might want to use as searchable
// segment. However, we do not want to mark them as committed, and we want
// to keep the current set of committed segment.
segment_updater.0.segment_manager.soft_commit(opstamp, segment_entries);
// ... We do not save the meta file.
} else {
// Hard_commit. We register the new segment entries as committed.
segment_updater
.0
.segment_manager
.commit(opstamp, segment_entries);
// TODO error handling.
segment_updater.save_metas(opstamp, payload);
segment_updater.0.index.directory().flush().unwrap();
}
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
}

View File

@@ -10,6 +10,7 @@ use Index;
use Result;
use Searcher;
use SegmentReader;
use schema::Schema;
/// Defines when a new version of the index should be reloaded.
///
@@ -158,6 +159,11 @@ pub struct IndexReader {
}
impl IndexReader {
pub fn schema(&self) -> Schema {
self.inner.index.schema()
}
/// Update searchers so that they reflect the state of the last
/// `.commit()`.
///