mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-21 10:40:41 +00:00
Added persisted to Segment and SegmentEntry
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
use super::segment::create_segment;
|
||||
use super::segment::Segment;
|
||||
use crate::core::Executor;
|
||||
use crate::core::IndexMeta;
|
||||
@@ -331,9 +330,8 @@ impl Index {
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
create_segment(self.clone(), segment_meta)
|
||||
pub(crate) fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
Segment::for_index(self.clone(), segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
@@ -344,6 +342,13 @@ impl Index {
|
||||
self.segment(segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
pub(crate) fn new_segment_unpersisted(&self) -> Segment {
|
||||
let meta = self
|
||||
.inventory
|
||||
.new_segment_meta(SegmentId::generate_random(), 0);
|
||||
Segment::new_volatile(meta, self.schema())
|
||||
}
|
||||
/// Return a reference to the index directory.
|
||||
pub fn directory(&self) -> &ManagedDirectory {
|
||||
&self.directory
|
||||
|
||||
@@ -3,51 +3,70 @@ use crate::core::Index;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::{Directory, DirectoryClone};
|
||||
use crate::directory::{Directory, ManagedDirectory, RAMDirectory};
|
||||
use crate::directory::{ReadOnlySource, WritePtr};
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use crate::Result;
|
||||
use failure::_core::ops::DerefMut;
|
||||
use std::fmt;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::result;
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
pub struct Segment {
|
||||
schema: Schema,
|
||||
directory: Box<dyn Directory>,
|
||||
meta: SegmentMeta,
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum SegmentDirectory {
|
||||
Persisted(ManagedDirectory),
|
||||
Volatile(RAMDirectory),
|
||||
}
|
||||
|
||||
impl Clone for Segment {
|
||||
fn clone(&self) -> Self {
|
||||
Segment {
|
||||
schema: self.schema.clone(),
|
||||
directory: self.directory.box_clone(),
|
||||
meta: self.meta.clone(),
|
||||
impl SegmentDirectory {
|
||||
pub fn new_volatile() -> SegmentDirectory {
|
||||
SegmentDirectory::Volatile(RAMDirectory::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ManagedDirectory> for SegmentDirectory {
|
||||
fn from(directory: ManagedDirectory) -> Self {
|
||||
SegmentDirectory::Persisted(directory)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SegmentDirectory {
|
||||
type Target = dyn Directory;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
SegmentDirectory::Volatile(dir) => dir,
|
||||
SegmentDirectory::Persisted(dir) => dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SegmentDirectory {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match self {
|
||||
SegmentDirectory::Volatile(dir) => dir,
|
||||
SegmentDirectory::Persisted(dir) => dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
#[derive(Clone)]
|
||||
pub struct Segment {
|
||||
schema: Schema,
|
||||
meta: SegmentMeta,
|
||||
directory: SegmentDirectory,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Segment {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Segment({:?})", self.id().uuid_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
/// #[doc(hidden)]
|
||||
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment {
|
||||
directory: index.directory().box_clone(),
|
||||
schema: index.schema(),
|
||||
meta,
|
||||
}
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
/// Returns our index's schema.
|
||||
// TODO return a ref.
|
||||
@@ -55,6 +74,51 @@ impl Segment {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn new_persisted(
|
||||
meta: SegmentMeta,
|
||||
directory: ManagedDirectory,
|
||||
schema: Schema,
|
||||
) -> Segment {
|
||||
Segment {
|
||||
meta,
|
||||
schema,
|
||||
directory: SegmentDirectory::from(directory),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment that embeds its own `RAMDirectory`.
|
||||
///
|
||||
/// That segment is entirely dissociated from the index directory.
|
||||
/// It will be persisted by a background thread in charge of IO.
|
||||
pub fn new_unpersisted(meta: SegmentMeta, schema: Schema) -> Segment {
|
||||
Segment {
|
||||
schema,
|
||||
meta,
|
||||
directory: SegmentDirectory::new_volatile(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment {
|
||||
directory: SegmentDirectory::Persisted(index.directory().clone()),
|
||||
schema: index.schema(),
|
||||
meta,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn persist(&mut self, mut dest_directory: ManagedDirectory) -> crate::Result<()> {
|
||||
if let SegmentDirectory::Persisted(_) = self.directory {
|
||||
// this segment is already persisted.
|
||||
return Ok(());
|
||||
}
|
||||
if let SegmentDirectory::Volatile(ram_directory) = &self.directory {
|
||||
ram_directory.persist(&mut dest_directory)?;
|
||||
}
|
||||
self.directory = SegmentDirectory::Persisted(dest_directory);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the segment meta-information
|
||||
pub fn meta(&self) -> &SegmentMeta {
|
||||
&self.meta
|
||||
|
||||
@@ -144,6 +144,16 @@ impl RAMDirectory {
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
|
||||
let wlock = self.fs.write().unwrap();
|
||||
for (path, source) in wlock.fs.iter() {
|
||||
let mut dest_wrt = dest.open_write(path)?;
|
||||
dest_wrt.write_all(source.as_slice())?;
|
||||
dest_wrt.terminate()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
|
||||
@@ -181,7 +181,7 @@ pub(crate) fn advance_deletes(
|
||||
delete_file.terminate()?;
|
||||
}
|
||||
|
||||
segment_entry.set_meta(segment.meta().clone());
|
||||
segment_entry.set_segment(segment);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -233,11 +233,7 @@ fn index_documents(
|
||||
last_docstamp,
|
||||
)?;
|
||||
|
||||
let segment_entry = SegmentEntry::new(
|
||||
segment_with_max_doc.meta().clone(),
|
||||
delete_cursor,
|
||||
delete_bitset_opt,
|
||||
);
|
||||
let segment_entry = SegmentEntry::new(segment_with_max_doc, delete_cursor, delete_bitset_opt);
|
||||
block_on(segment_updater.schedule_add_segment(segment_entry))?;
|
||||
Ok(true)
|
||||
}
|
||||
@@ -375,13 +371,6 @@ impl IndexWriter {
|
||||
result
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
|
||||
let delete_cursor = self.delete_queue.cursor();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
|
||||
block_on(self.segment_updater.schedule_add_segment(segment_entry))
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
///
|
||||
/// This method is useful only for users trying to do complex
|
||||
@@ -430,7 +419,7 @@ impl IndexWriter {
|
||||
// was dropped.
|
||||
return Ok(());
|
||||
}
|
||||
let segment = index.new_segment();
|
||||
let segment = index.new_segment_unpersisted();
|
||||
index_documents(
|
||||
mem_budget,
|
||||
segment,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use crate::common::BitSet;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::ManagedDirectory;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::Segment;
|
||||
use std::fmt;
|
||||
|
||||
/// A segment entry describes the state of
|
||||
@@ -19,25 +21,31 @@ use std::fmt;
|
||||
/// in the .del file or in the `delete_bitset`.
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentEntry {
|
||||
meta: SegmentMeta,
|
||||
segment: Segment,
|
||||
delete_bitset: Option<BitSet>,
|
||||
delete_cursor: DeleteCursor,
|
||||
}
|
||||
|
||||
impl SegmentEntry {
|
||||
/// Create a new `SegmentEntry`
|
||||
pub fn new(
|
||||
segment_meta: SegmentMeta,
|
||||
pub(crate) fn new(
|
||||
segment: Segment,
|
||||
delete_cursor: DeleteCursor,
|
||||
delete_bitset: Option<BitSet>,
|
||||
) -> SegmentEntry {
|
||||
SegmentEntry {
|
||||
meta: segment_meta,
|
||||
segment,
|
||||
delete_bitset,
|
||||
delete_cursor,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn persist(&mut self, dest_directory: ManagedDirectory) -> crate::Result<()> {
|
||||
// TODO take in account delete bitset?
|
||||
self.segment.persist(dest_directory)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
@@ -46,8 +54,8 @@ impl SegmentEntry {
|
||||
}
|
||||
|
||||
/// Set the `SegmentMeta` for this segment.
|
||||
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
|
||||
self.meta = segment_meta;
|
||||
pub fn set_segment(&mut self, segment: Segment) {
|
||||
self.segment = segment;
|
||||
}
|
||||
|
||||
/// Return a reference to the segment_entry's delete cursor
|
||||
@@ -57,17 +65,22 @@ impl SegmentEntry {
|
||||
|
||||
/// Returns the segment id.
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.meta.id()
|
||||
self.segment.id()
|
||||
}
|
||||
|
||||
/// Accessor to the `SegmentMeta`
|
||||
pub fn meta(&self) -> &SegmentMeta {
|
||||
&self.meta
|
||||
self.segment.meta()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SegmentEntry {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(formatter, "SegmentEntry({:?})", self.meta)
|
||||
let num_deletes = self.delete_bitset.as_ref().map(|bitset| bitset.len());
|
||||
write!(
|
||||
formatter,
|
||||
"SegmentEntry(seg={:?}, ndel={:?})",
|
||||
self.segment, num_deletes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::Index;
|
||||
use std::collections::hash_set::HashSet;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::sync::RwLock;
|
||||
@@ -74,13 +75,19 @@ pub fn get_mergeable_segments(
|
||||
|
||||
impl SegmentManager {
|
||||
pub fn from_segments(
|
||||
index: &Index,
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
committed: SegmentRegister::new(
|
||||
index.directory(),
|
||||
&index.schema(),
|
||||
segment_metas,
|
||||
delete_cursor,
|
||||
),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::ManagedDirectory;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::segment_entry::SegmentEntry;
|
||||
use crate::schema::Schema;
|
||||
use crate::Segment;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
@@ -79,11 +82,17 @@ impl SegmentRegister {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
pub fn new(
|
||||
directory: &ManagedDirectory,
|
||||
schema: &Schema,
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> SegmentRegister {
|
||||
let mut segment_states = HashMap::new();
|
||||
for segment_meta in segment_metas {
|
||||
let segment_id = segment_meta.id();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
|
||||
for meta in segment_metas {
|
||||
let segment_id = meta.id();
|
||||
let segment = Segment::new_persisted(meta, directory.clone(), schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_cursor.clone(), None);
|
||||
segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
@@ -108,6 +117,7 @@ mod tests {
|
||||
fn test_segment_register() {
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let delete_queue = DeleteQueue::new();
|
||||
let schema = Schema::builder().build();
|
||||
|
||||
let mut segment_register = SegmentRegister::default();
|
||||
let segment_id_a = SegmentId::generate_random();
|
||||
@@ -115,21 +125,24 @@ mod tests {
|
||||
let segment_id_merged = SegmentId::generate_random();
|
||||
|
||||
{
|
||||
let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
let meta = inventory.new_segment_meta(segment_id_a, 0u32);
|
||||
let segment = Segment::new_volatile(meta, schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||
{
|
||||
let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
let segment = Segment::new_volatile(segment_meta, schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
|
||||
let segment = Segment::new_volatile(segment_meta_merged, schema);
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
|
||||
|
||||
@@ -134,11 +134,13 @@ fn merge(
|
||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
let max_doc = merger.write(segment_serializer)?;
|
||||
|
||||
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
|
||||
|
||||
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
|
||||
Ok(SegmentEntry::new(
|
||||
merged_segment.with_max_doc(max_doc),
|
||||
delete_cursor,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) struct InnerSegmentUpdater {
|
||||
@@ -167,7 +169,7 @@ impl SegmentUpdater {
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> crate::Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let segment_manager = SegmentManager::from_segments(&index, segments, delete_cursor);
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
@@ -228,10 +230,12 @@ impl SegmentUpdater {
|
||||
|
||||
pub fn schedule_add_segment(
|
||||
&self,
|
||||
segment_entry: SegmentEntry,
|
||||
mut segment_entry: SegmentEntry,
|
||||
) -> impl Future<Output = crate::Result<()>> {
|
||||
// TODO temporary: serializing the segment at this point.
|
||||
let segment_updater = self.clone();
|
||||
self.schedule_future(async move {
|
||||
segment_entry.persist(segment_updater.index.directory().clone())?;
|
||||
segment_updater.segment_manager.add_segment(segment_entry);
|
||||
segment_updater.consider_merge_options().await;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user