mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 07:42:54 +00:00
Compare commits
3 Commits
remove_ran
...
bundle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7973892a2 | ||
|
|
cd7484c035 | ||
|
|
7ed6bc8718 |
@@ -186,7 +186,7 @@ mod test {
|
|||||||
use super::{CompositeFile, CompositeWrite};
|
use super::{CompositeFile, CompositeWrite};
|
||||||
use crate::common::BinarySerializable;
|
use crate::common::BinarySerializable;
|
||||||
use crate::common::VInt;
|
use crate::common::VInt;
|
||||||
use crate::directory::{Directory, RAMDirectory};
|
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory};
|
||||||
use crate::schema::Field;
|
use crate::schema::Field;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|||||||
@@ -338,7 +338,7 @@ impl Index {
|
|||||||
|
|
||||||
/// Creates a new segment.
|
/// Creates a new segment.
|
||||||
pub fn new_segment(&self) -> Segment {
|
pub fn new_segment(&self) -> Segment {
|
||||||
let segment_meta = self
|
let mut segment_meta = self
|
||||||
.inventory
|
.inventory
|
||||||
.new_segment_meta(SegmentId::generate_random(), 0);
|
.new_segment_meta(SegmentId::generate_random(), 0);
|
||||||
self.segment(segment_meta)
|
self.segment(segment_meta)
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ impl SegmentMetaInventory {
|
|||||||
segment_id,
|
segment_id,
|
||||||
max_doc,
|
max_doc,
|
||||||
deletes: None,
|
deletes: None,
|
||||||
|
bundled: false,
|
||||||
};
|
};
|
||||||
SegmentMeta::from(self.inventory.track(inner))
|
SegmentMeta::from(self.inventory.track(inner))
|
||||||
}
|
}
|
||||||
@@ -81,6 +82,19 @@ impl SegmentMeta {
|
|||||||
self.tracked.segment_id
|
self.tracked.segment_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn with_bundled(self) -> SegmentMeta {
|
||||||
|
SegmentMeta::from(self.tracked.map(|inner| InnerSegmentMeta {
|
||||||
|
segment_id: inner.segment_id,
|
||||||
|
max_doc: inner.max_doc,
|
||||||
|
deletes: inner.deletes.clone(),
|
||||||
|
bundled: true,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_bundled(&self) -> bool {
|
||||||
|
self.tracked.bundled
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the number of deleted documents.
|
/// Returns the number of deleted documents.
|
||||||
pub fn num_deleted_docs(&self) -> u32 {
|
pub fn num_deleted_docs(&self) -> u32 {
|
||||||
self.tracked
|
self.tracked
|
||||||
@@ -107,8 +121,12 @@ impl SegmentMeta {
|
|||||||
/// It just joins the segment id with the extension
|
/// It just joins the segment id with the extension
|
||||||
/// associated to a segment component.
|
/// associated to a segment component.
|
||||||
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
|
||||||
let mut path = self.id().uuid_string();
|
let suffix = self.suffix(component);
|
||||||
path.push_str(&*match component {
|
self.relative_path_from_suffix(&suffix)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn suffix(&self, component: SegmentComponent) -> String {
|
||||||
|
match component {
|
||||||
SegmentComponent::POSTINGS => ".idx".to_string(),
|
SegmentComponent::POSTINGS => ".idx".to_string(),
|
||||||
SegmentComponent::POSITIONS => ".pos".to_string(),
|
SegmentComponent::POSITIONS => ".pos".to_string(),
|
||||||
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
|
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
|
||||||
@@ -117,7 +135,17 @@ impl SegmentMeta {
|
|||||||
SegmentComponent::FASTFIELDS => ".fast".to_string(),
|
SegmentComponent::FASTFIELDS => ".fast".to_string(),
|
||||||
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
|
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
|
||||||
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
|
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
|
||||||
});
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the relative path of a component of our segment.
|
||||||
|
///
|
||||||
|
/// It just joins the segment id with the extension
|
||||||
|
/// associated to a segment component.
|
||||||
|
pub fn relative_path_from_suffix(&self, suffix: &str) -> PathBuf {
|
||||||
|
let mut path = self.id().uuid_string();
|
||||||
|
path.push_str(".");
|
||||||
|
path.push_str(&suffix);
|
||||||
PathBuf::from(path)
|
PathBuf::from(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,6 +189,7 @@ impl SegmentMeta {
|
|||||||
segment_id: inner_meta.segment_id,
|
segment_id: inner_meta.segment_id,
|
||||||
max_doc,
|
max_doc,
|
||||||
deletes: None,
|
deletes: None,
|
||||||
|
bundled: inner_meta.bundled,
|
||||||
});
|
});
|
||||||
SegmentMeta { tracked }
|
SegmentMeta { tracked }
|
||||||
}
|
}
|
||||||
@@ -175,6 +204,7 @@ impl SegmentMeta {
|
|||||||
segment_id: inner_meta.segment_id,
|
segment_id: inner_meta.segment_id,
|
||||||
max_doc: inner_meta.max_doc,
|
max_doc: inner_meta.max_doc,
|
||||||
deletes: Some(delete_meta),
|
deletes: Some(delete_meta),
|
||||||
|
bundled: inner_meta.bundled,
|
||||||
});
|
});
|
||||||
SegmentMeta { tracked }
|
SegmentMeta { tracked }
|
||||||
}
|
}
|
||||||
@@ -185,6 +215,7 @@ struct InnerSegmentMeta {
|
|||||||
segment_id: SegmentId,
|
segment_id: SegmentId,
|
||||||
max_doc: u32,
|
max_doc: u32,
|
||||||
deletes: Option<DeleteMeta>,
|
deletes: Option<DeleteMeta>,
|
||||||
|
bundled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InnerSegmentMeta {
|
impl InnerSegmentMeta {
|
||||||
|
|||||||
@@ -4,14 +4,12 @@ use crate::core::SegmentId;
|
|||||||
use crate::core::SegmentMeta;
|
use crate::core::SegmentMeta;
|
||||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||||
use crate::directory::Directory;
|
use crate::directory::Directory;
|
||||||
use crate::directory::{ReadOnlySource, WritePtr};
|
use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr};
|
||||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||||
use crate::schema::Schema;
|
use crate::schema::Schema;
|
||||||
use crate::Opstamp;
|
use crate::Opstamp;
|
||||||
use crate::Result;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::result;
|
|
||||||
|
|
||||||
/// A segment is a piece of the index.
|
/// A segment is a piece of the index.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -83,23 +81,30 @@ impl Segment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Open one of the component file for a *regular* read.
|
/// Open one of the component file for a *regular* read.
|
||||||
pub fn open_read(
|
pub fn open_read(&self, component: SegmentComponent) -> Result<ReadOnlySource, OpenReadError> {
|
||||||
&self,
|
|
||||||
component: SegmentComponent,
|
|
||||||
) -> result::Result<ReadOnlySource, OpenReadError> {
|
|
||||||
let path = self.relative_path(component);
|
let path = self.relative_path(component);
|
||||||
let source = self.index.directory().open_read(&path)?;
|
let source = self.index.directory().open_read(&path)?;
|
||||||
Ok(source)
|
Ok(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Open one of the component file for *regular* write.
|
/// Open one of the component file for *regular* write.
|
||||||
pub fn open_write(
|
pub fn open_write(&mut self, component: SegmentComponent) -> Result<WritePtr, OpenWriteError> {
|
||||||
|
let path = self.relative_path(component);
|
||||||
|
self.index.directory_mut().open_write(&path)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open_bundle_writer(&mut self) -> Result<WritePtr, OpenWriteError> {
|
||||||
|
let path = self.meta.relative_path_from_suffix("bundle");
|
||||||
|
self.index.directory_mut().open_write(&path)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn open_write_in_directory(
|
||||||
&mut self,
|
&mut self,
|
||||||
component: SegmentComponent,
|
component: SegmentComponent,
|
||||||
) -> result::Result<WritePtr, OpenWriteError> {
|
directory: &mut dyn Directory,
|
||||||
|
) -> Result<WritePtr, OpenWriteError> {
|
||||||
let path = self.relative_path(component);
|
let path = self.relative_path(component);
|
||||||
let write = self.index.directory_mut().open_write(&path)?;
|
directory.open_write(&path)
|
||||||
Ok(write)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,5 +114,5 @@ pub trait SerializableSegment {
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// The number of documents in the segment.
|
/// The number of documents in the segment.
|
||||||
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
|
fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32>;
|
||||||
}
|
}
|
||||||
|
|||||||
97
src/directory/bundle_directory.rs
Normal file
97
src/directory/bundle_directory.rs
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
use crate::directory::directory::ReadOnlyDirectory;
|
||||||
|
use crate::directory::error::OpenReadError;
|
||||||
|
use crate::directory::ReadOnlySource;
|
||||||
|
use crate::error::DataCorruption;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct BundleDirectory {
|
||||||
|
source_map: Arc<HashMap<PathBuf, ReadOnlySource>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BundleDirectory {
|
||||||
|
pub fn from_source(source: ReadOnlySource) -> Result<BundleDirectory, DataCorruption> {
|
||||||
|
let mut index_offset_buf = [0u8; 8];
|
||||||
|
let (body_idx, footer_offset) = source.split_from_end(8);
|
||||||
|
index_offset_buf.copy_from_slice(footer_offset.as_slice());
|
||||||
|
let offset = u64::from_le_bytes(index_offset_buf);
|
||||||
|
let (body_source, idx_source) = body_idx.split(offset as usize);
|
||||||
|
let idx: HashMap<PathBuf, (u64, u64)> = serde_json::from_slice(idx_source.as_slice())
|
||||||
|
.map_err(|err| {
|
||||||
|
let msg = format!("Failed to read index from bundle. {:?}", err);
|
||||||
|
DataCorruption::comment_only(msg)
|
||||||
|
})?;
|
||||||
|
let source_map: HashMap<PathBuf, ReadOnlySource> = idx
|
||||||
|
.into_iter()
|
||||||
|
.map(|(path, (start, stop))| {
|
||||||
|
let source = body_source.slice(start as usize, stop as usize);
|
||||||
|
(path, source)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(BundleDirectory {
|
||||||
|
source_map: Arc::new(source_map),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadOnlyDirectory for BundleDirectory {
|
||||||
|
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
|
||||||
|
self.source_map
|
||||||
|
.get(path)
|
||||||
|
.cloned()
|
||||||
|
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exists(&self, path: &Path) -> bool {
|
||||||
|
self.source_map.contains_key(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||||
|
let source = self
|
||||||
|
.source_map
|
||||||
|
.get(path)
|
||||||
|
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
|
||||||
|
Ok(source.as_slice().to_vec())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::BundleDirectory;
|
||||||
|
use crate::directory::{RAMDirectory, ReadOnlyDirectory, TerminatingWrite};
|
||||||
|
use crate::Directory;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_bundle_directory() {
|
||||||
|
let mut ram_directory = RAMDirectory::default();
|
||||||
|
let test_path_atomic = Path::new("testpath_atomic");
|
||||||
|
let test_path_wrt = Path::new("testpath_wrt");
|
||||||
|
assert!(ram_directory
|
||||||
|
.atomic_write(test_path_atomic, b"titi")
|
||||||
|
.is_ok());
|
||||||
|
{
|
||||||
|
let mut test_wrt = ram_directory.open_write(test_path_wrt).unwrap();
|
||||||
|
assert!(test_wrt.write_all(b"toto").is_ok());
|
||||||
|
assert!(test_wrt.terminate().is_ok());
|
||||||
|
}
|
||||||
|
let mut dest_directory = RAMDirectory::default();
|
||||||
|
let bundle_path = Path::new("bundle");
|
||||||
|
let mut wrt = dest_directory.open_write(bundle_path).unwrap();
|
||||||
|
assert!(ram_directory.serialize_bundle(&mut wrt).is_ok());
|
||||||
|
assert!(wrt.terminate().is_ok());
|
||||||
|
let source = dest_directory.open_read(bundle_path).unwrap();
|
||||||
|
let bundle_directory = BundleDirectory::from_source(source).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
&bundle_directory.atomic_read(test_path_atomic).unwrap()[..],
|
||||||
|
b"titi"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
&bundle_directory.open_read(test_path_wrt).unwrap()[..],
|
||||||
|
b"toto"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -100,17 +100,7 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write-once read many (WORM) abstraction for where
|
pub trait ReadOnlyDirectory {
|
||||||
/// tantivy's data should be stored.
|
|
||||||
///
|
|
||||||
/// There are currently two implementations of `Directory`
|
|
||||||
///
|
|
||||||
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
|
|
||||||
/// should be your default choice.
|
|
||||||
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
|
||||||
/// should be used mostly for tests.
|
|
||||||
///
|
|
||||||
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
|
||||||
/// Opens a virtual file for read.
|
/// Opens a virtual file for read.
|
||||||
///
|
///
|
||||||
/// Once a virtual file is open, its data may not
|
/// Once a virtual file is open, its data may not
|
||||||
@@ -122,6 +112,31 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
|||||||
/// You should only use this to read files create with [Directory::open_write].
|
/// You should only use this to read files create with [Directory::open_write].
|
||||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
|
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
|
||||||
|
|
||||||
|
/// Returns true iff the file exists
|
||||||
|
fn exists(&self, path: &Path) -> bool;
|
||||||
|
|
||||||
|
/// Reads the full content file that has been written using
|
||||||
|
/// atomic_write.
|
||||||
|
///
|
||||||
|
/// This should only be used for small files.
|
||||||
|
///
|
||||||
|
/// You should only use this to read files create with [Directory::atomic_write].
|
||||||
|
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write-once read many (WORM) abstraction for where
|
||||||
|
/// tantivy's data should be stored.
|
||||||
|
///
|
||||||
|
/// There are currently two implementations of `Directory`
|
||||||
|
///
|
||||||
|
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
|
||||||
|
/// should be your default choice.
|
||||||
|
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
||||||
|
/// should be used mostly for tests.
|
||||||
|
///
|
||||||
|
pub trait Directory:
|
||||||
|
DirectoryClone + ReadOnlyDirectory + fmt::Debug + Send + Sync + 'static
|
||||||
|
{
|
||||||
/// Removes a file
|
/// Removes a file
|
||||||
///
|
///
|
||||||
/// Removing a file will not affect an eventual
|
/// Removing a file will not affect an eventual
|
||||||
@@ -131,9 +146,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
|||||||
/// `DeleteError::DoesNotExist`.
|
/// `DeleteError::DoesNotExist`.
|
||||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
|
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
|
||||||
|
|
||||||
/// Returns true iff the file exists
|
|
||||||
fn exists(&self, path: &Path) -> bool;
|
|
||||||
|
|
||||||
/// Opens a writer for the *virtual file* associated with
|
/// Opens a writer for the *virtual file* associated with
|
||||||
/// a Path.
|
/// a Path.
|
||||||
///
|
///
|
||||||
@@ -155,14 +167,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
|||||||
/// The file may not previously exist.
|
/// The file may not previously exist.
|
||||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError>;
|
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError>;
|
||||||
|
|
||||||
/// Reads the full content file that has been written using
|
|
||||||
/// atomic_write.
|
|
||||||
///
|
|
||||||
/// This should only be used for small files.
|
|
||||||
///
|
|
||||||
/// You should only use this to read files create with [Directory::atomic_write].
|
|
||||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
|
|
||||||
|
|
||||||
/// Atomically replace the content of a file with data.
|
/// Atomically replace the content of a file with data.
|
||||||
///
|
///
|
||||||
/// This calls ensure that reads can never *observe*
|
/// This calls ensure that reads can never *observe*
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use crate::directory::{WatchCallback, WatchHandle};
|
|||||||
use crate::error::DataCorruption;
|
use crate::error::DataCorruption;
|
||||||
use crate::Directory;
|
use crate::Directory;
|
||||||
|
|
||||||
|
use crate::directory::directory::ReadOnlyDirectory;
|
||||||
use crc32fast::Hasher;
|
use crc32fast::Hasher;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
@@ -264,14 +265,6 @@ impl ManagedDirectory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Directory for ManagedDirectory {
|
impl Directory for ManagedDirectory {
|
||||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
|
||||||
let read_only_source = self.directory.open_read(path)?;
|
|
||||||
let (footer, reader) = Footer::extract_footer(read_only_source)
|
|
||||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
|
||||||
footer.is_compatible()?;
|
|
||||||
Ok(reader)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
||||||
self.register_file_as_managed(path)
|
self.register_file_as_managed(path)
|
||||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||||
@@ -289,18 +282,10 @@ impl Directory for ManagedDirectory {
|
|||||||
self.directory.atomic_write(path, data)
|
self.directory.atomic_write(path, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
|
||||||
self.directory.atomic_read(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||||
self.directory.delete(path)
|
self.directory.delete(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exists(&self, path: &Path) -> bool {
|
|
||||||
self.directory.exists(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
|
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
|
||||||
self.directory.acquire_lock(lock)
|
self.directory.acquire_lock(lock)
|
||||||
}
|
}
|
||||||
@@ -310,6 +295,24 @@ impl Directory for ManagedDirectory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ReadOnlyDirectory for ManagedDirectory {
|
||||||
|
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||||
|
let read_only_source = self.directory.open_read(path)?;
|
||||||
|
let (footer, reader) = Footer::extract_footer(read_only_source)
|
||||||
|
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||||
|
footer.is_compatible()?;
|
||||||
|
Ok(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exists(&self, path: &Path) -> bool {
|
||||||
|
self.directory.exists(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
||||||
|
self.directory.atomic_read(path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Clone for ManagedDirectory {
|
impl Clone for ManagedDirectory {
|
||||||
fn clone(&self) -> ManagedDirectory {
|
fn clone(&self) -> ManagedDirectory {
|
||||||
ManagedDirectory {
|
ManagedDirectory {
|
||||||
@@ -323,7 +326,9 @@ impl Clone for ManagedDirectory {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests_mmap_specific {
|
mod tests_mmap_specific {
|
||||||
|
|
||||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
|
use crate::directory::{
|
||||||
|
Directory, ManagedDirectory, MmapDirectory, ReadOnlyDirectory, TerminatingWrite,
|
||||||
|
};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use self::notify::RawEvent;
|
|||||||
use self::notify::RecursiveMode;
|
use self::notify::RecursiveMode;
|
||||||
use self::notify::Watcher;
|
use self::notify::Watcher;
|
||||||
use crate::core::META_FILEPATH;
|
use crate::core::META_FILEPATH;
|
||||||
|
use crate::directory::directory::ReadOnlyDirectory;
|
||||||
use crate::directory::error::LockError;
|
use crate::directory::error::LockError;
|
||||||
use crate::directory::error::{
|
use crate::directory::error::{
|
||||||
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||||
@@ -407,24 +408,6 @@ impl TerminatingWrite for SafeFileWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Directory for MmapDirectory {
|
impl Directory for MmapDirectory {
|
||||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
|
||||||
debug!("Open Read {:?}", path);
|
|
||||||
let full_path = self.resolve_path(path);
|
|
||||||
|
|
||||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
|
||||||
let msg = format!(
|
|
||||||
"Failed to acquired write lock \
|
|
||||||
on mmap cache while reading {:?}",
|
|
||||||
path
|
|
||||||
);
|
|
||||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
|
||||||
})?;
|
|
||||||
Ok(mmap_cache
|
|
||||||
.get_mmap(&full_path)?
|
|
||||||
.map(ReadOnlySource::from)
|
|
||||||
.unwrap_or_else(ReadOnlySource::empty))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Any entry associated to the path in the mmap will be
|
/// Any entry associated to the path in the mmap will be
|
||||||
/// removed before the file is deleted.
|
/// removed before the file is deleted.
|
||||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||||
@@ -443,11 +426,6 @@ impl Directory for MmapDirectory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exists(&self, path: &Path) -> bool {
|
|
||||||
let full_path = self.resolve_path(path);
|
|
||||||
full_path.exists()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||||
debug!("Open Write {:?}", path);
|
debug!("Open Write {:?}", path);
|
||||||
let full_path = self.resolve_path(path);
|
let full_path = self.resolve_path(path);
|
||||||
@@ -478,25 +456,6 @@ impl Directory for MmapDirectory {
|
|||||||
Ok(BufWriter::new(Box::new(writer)))
|
Ok(BufWriter::new(Box::new(writer)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
|
||||||
let full_path = self.resolve_path(path);
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
match File::open(&full_path) {
|
|
||||||
Ok(mut file) => {
|
|
||||||
file.read_to_end(&mut buffer)
|
|
||||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
|
||||||
Ok(buffer)
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
if e.kind() == io::ErrorKind::NotFound {
|
|
||||||
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
|
|
||||||
} else {
|
|
||||||
Err(IOError::with_path(path.to_owned(), e).into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||||
debug!("Atomic Write {:?}", path);
|
debug!("Atomic Write {:?}", path);
|
||||||
let full_path = self.resolve_path(path);
|
let full_path = self.resolve_path(path);
|
||||||
@@ -530,6 +489,50 @@ impl Directory for MmapDirectory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ReadOnlyDirectory for MmapDirectory {
|
||||||
|
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||||
|
debug!("Open Read {:?}", path);
|
||||||
|
let full_path = self.resolve_path(path);
|
||||||
|
|
||||||
|
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||||
|
let msg = format!(
|
||||||
|
"Failed to acquired write lock \
|
||||||
|
on mmap cache while reading {:?}",
|
||||||
|
path
|
||||||
|
);
|
||||||
|
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||||
|
})?;
|
||||||
|
Ok(mmap_cache
|
||||||
|
.get_mmap(&full_path)?
|
||||||
|
.map(ReadOnlySource::from)
|
||||||
|
.unwrap_or_else(ReadOnlySource::empty))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exists(&self, path: &Path) -> bool {
|
||||||
|
let full_path = self.resolve_path(path);
|
||||||
|
full_path.exists()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||||
|
let full_path = self.resolve_path(path);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
match File::open(&full_path) {
|
||||||
|
Ok(mut file) => {
|
||||||
|
file.read_to_end(&mut buffer)
|
||||||
|
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||||
|
Ok(buffer)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if e.kind() == io::ErrorKind::NotFound {
|
||||||
|
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
|
||||||
|
} else {
|
||||||
|
Err(IOError::with_path(path.to_owned(), e).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ WORM directory abstraction.
|
|||||||
#[cfg(feature = "mmap")]
|
#[cfg(feature = "mmap")]
|
||||||
mod mmap_directory;
|
mod mmap_directory;
|
||||||
|
|
||||||
|
mod bundle_directory;
|
||||||
mod directory;
|
mod directory;
|
||||||
mod directory_lock;
|
mod directory_lock;
|
||||||
mod footer;
|
mod footer;
|
||||||
@@ -19,7 +20,7 @@ mod watch_event_router;
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
|
|
||||||
pub use self::directory::DirectoryLock;
|
pub use self::directory::DirectoryLock;
|
||||||
pub use self::directory::{Directory, DirectoryClone};
|
pub use self::directory::{Directory, DirectoryClone, ReadOnlyDirectory};
|
||||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||||
pub use self::ram_directory::RAMDirectory;
|
pub use self::ram_directory::RAMDirectory;
|
||||||
pub use self::read_only_source::ReadOnlySource;
|
pub use self::read_only_source::ReadOnlySource;
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
|
use crate::common::CountingWriter;
|
||||||
use crate::core::META_FILEPATH;
|
use crate::core::META_FILEPATH;
|
||||||
|
use crate::directory::directory::ReadOnlyDirectory;
|
||||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||||
use crate::directory::AntiCallToken;
|
use crate::directory::AntiCallToken;
|
||||||
use crate::directory::WatchCallbackList;
|
use crate::directory::WatchCallbackList;
|
||||||
@@ -115,6 +117,22 @@ impl InnerDirectory {
|
|||||||
fn total_mem_usage(&self) -> usize {
|
fn total_mem_usage(&self) -> usize {
|
||||||
self.fs.values().map(|f| f.len()).sum()
|
self.fs.values().map(|f| f.len()).sum()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn serialize_bundle(&self, wrt: &mut WritePtr) -> io::Result<()> {
|
||||||
|
let mut counting_writer = CountingWriter::wrap(wrt);
|
||||||
|
let mut file_index: HashMap<PathBuf, (u64, u64)> = HashMap::default();
|
||||||
|
for (path, source) in &self.fs {
|
||||||
|
let start = counting_writer.written_bytes();
|
||||||
|
counting_writer.write_all(source.as_slice())?;
|
||||||
|
let stop = counting_writer.written_bytes();
|
||||||
|
file_index.insert(path.to_path_buf(), (start, stop));
|
||||||
|
}
|
||||||
|
let index_offset = counting_writer.written_bytes();
|
||||||
|
serde_json::to_writer(&mut counting_writer, &file_index)?;
|
||||||
|
let index_offset_buffer = index_offset.to_le_bytes();
|
||||||
|
counting_writer.write_all(&index_offset_buffer[..])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for RAMDirectory {
|
impl fmt::Debug for RAMDirectory {
|
||||||
@@ -144,13 +162,18 @@ impl RAMDirectory {
|
|||||||
pub fn total_mem_usage(&self) -> usize {
|
pub fn total_mem_usage(&self) -> usize {
|
||||||
self.fs.read().unwrap().total_mem_usage()
|
self.fs.read().unwrap().total_mem_usage()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serialize the RAMDirectory into a bundle.
|
||||||
|
///
|
||||||
|
/// This method will fail, write nothing, and return an error if a
|
||||||
|
/// clone of this repository exists.
|
||||||
|
pub fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> {
|
||||||
|
let inner_directory_rlock = self.fs.read().unwrap();
|
||||||
|
inner_directory_rlock.serialize_bundle(wrt)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Directory for RAMDirectory {
|
impl Directory for RAMDirectory {
|
||||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
|
||||||
self.fs.read().unwrap().open_read(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||||
fail_point!("RAMDirectory::delete", |_| {
|
fail_point!("RAMDirectory::delete", |_| {
|
||||||
use crate::directory::error::IOError;
|
use crate::directory::error::IOError;
|
||||||
@@ -160,10 +183,6 @@ impl Directory for RAMDirectory {
|
|||||||
self.fs.write().unwrap().delete(path)
|
self.fs.write().unwrap().delete(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exists(&self, path: &Path) -> bool {
|
|
||||||
self.fs.read().unwrap().exists(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||||
let mut fs = self.fs.write().unwrap();
|
let mut fs = self.fs.write().unwrap();
|
||||||
let path_buf = PathBuf::from(path);
|
let path_buf = PathBuf::from(path);
|
||||||
@@ -177,10 +196,6 @@ impl Directory for RAMDirectory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
|
||||||
Ok(self.open_read(path)?.as_slice().to_owned())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||||
fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new(
|
fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
@@ -204,3 +219,17 @@ impl Directory for RAMDirectory {
|
|||||||
Ok(self.fs.write().unwrap().watch(watch_callback))
|
Ok(self.fs.write().unwrap().watch(watch_callback))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ReadOnlyDirectory for RAMDirectory {
|
||||||
|
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||||
|
self.fs.read().unwrap().open_read(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exists(&self, path: &Path) -> bool {
|
||||||
|
self.fs.read().unwrap().exists(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||||
|
Ok(self.open_read(path)?.as_slice().to_owned())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -25,10 +25,10 @@ impl DataCorruption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn comment_only(comment: String) -> DataCorruption {
|
pub fn comment_only<TS: ToString>(comment: TS) -> DataCorruption {
|
||||||
DataCorruption {
|
DataCorruption {
|
||||||
filepath: None,
|
filepath: None,
|
||||||
comment,
|
comment: comment.to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ mod tests {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::common::CompositeFile;
|
use crate::common::CompositeFile;
|
||||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||||
use crate::fastfield::FastFieldReader;
|
use crate::fastfield::FastFieldReader;
|
||||||
use crate::merge_policy::NoMergePolicy;
|
use crate::merge_policy::NoMergePolicy;
|
||||||
use crate::schema::Field;
|
use crate::schema::Field;
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use crate::common::compute_num_bits;
|
|||||||
use crate::common::BinarySerializable;
|
use crate::common::BinarySerializable;
|
||||||
use crate::common::CompositeFile;
|
use crate::common::CompositeFile;
|
||||||
use crate::directory::ReadOnlySource;
|
use crate::directory::ReadOnlySource;
|
||||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||||
use crate::fastfield::{FastFieldSerializer, FastFieldsWriter};
|
use crate::fastfield::{FastFieldSerializer, FastFieldsWriter};
|
||||||
use crate::schema::Schema;
|
use crate::schema::Schema;
|
||||||
use crate::schema::FAST;
|
use crate::schema::FAST;
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
use crate::Result;
|
use crate::Directory;
|
||||||
|
|
||||||
use crate::core::Segment;
|
use crate::core::Segment;
|
||||||
use crate::core::SegmentComponent;
|
use crate::core::SegmentComponent;
|
||||||
|
use crate::directory::error::OpenWriteError;
|
||||||
|
use crate::directory::{DirectoryClone, RAMDirectory, TerminatingWrite, WritePtr};
|
||||||
use crate::fastfield::FastFieldSerializer;
|
use crate::fastfield::FastFieldSerializer;
|
||||||
use crate::fieldnorm::FieldNormsSerializer;
|
use crate::fieldnorm::FieldNormsSerializer;
|
||||||
use crate::postings::InvertedIndexSerializer;
|
use crate::postings::InvertedIndexSerializer;
|
||||||
|
use crate::schema::Schema;
|
||||||
use crate::store::StoreWriter;
|
use crate::store::StoreWriter;
|
||||||
|
|
||||||
/// Segment serializer is in charge of laying out on disk
|
/// Segment serializer is in charge of laying out on disk
|
||||||
@@ -14,25 +17,50 @@ pub struct SegmentSerializer {
|
|||||||
fast_field_serializer: FastFieldSerializer,
|
fast_field_serializer: FastFieldSerializer,
|
||||||
fieldnorms_serializer: FieldNormsSerializer,
|
fieldnorms_serializer: FieldNormsSerializer,
|
||||||
postings_serializer: InvertedIndexSerializer,
|
postings_serializer: InvertedIndexSerializer,
|
||||||
|
bundle_writer: Option<(RAMDirectory, WritePtr)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct SegmentSerializerWriters {
|
||||||
|
postings_wrt: WritePtr,
|
||||||
|
positions_skip_wrt: WritePtr,
|
||||||
|
positions_wrt: WritePtr,
|
||||||
|
terms_wrt: WritePtr,
|
||||||
|
fast_field_wrt: WritePtr,
|
||||||
|
fieldnorms_wrt: WritePtr,
|
||||||
|
store_wrt: WritePtr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SegmentSerializerWriters {
|
||||||
|
pub(crate) fn for_segment(segment: &mut Segment) -> Result<Self, OpenWriteError> {
|
||||||
|
Ok(SegmentSerializerWriters {
|
||||||
|
postings_wrt: segment.open_write(SegmentComponent::POSTINGS)?,
|
||||||
|
positions_skip_wrt: segment.open_write(SegmentComponent::POSITIONS)?,
|
||||||
|
positions_wrt: segment.open_write(SegmentComponent::POSITIONSSKIP)?,
|
||||||
|
terms_wrt: segment.open_write(SegmentComponent::TERMS)?,
|
||||||
|
fast_field_wrt: segment.open_write(SegmentComponent::FASTFIELDS)?,
|
||||||
|
fieldnorms_wrt: segment.open_write(SegmentComponent::FIELDNORMS)?,
|
||||||
|
store_wrt: segment.open_write(SegmentComponent::STORE)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentSerializer {
|
impl SegmentSerializer {
|
||||||
/// Creates a new `SegmentSerializer`.
|
pub(crate) fn new(schema: Schema, writers: SegmentSerializerWriters) -> crate::Result<Self> {
|
||||||
pub fn for_segment(segment: &mut Segment) -> Result<SegmentSerializer> {
|
let fast_field_serializer = FastFieldSerializer::from_write(writers.fast_field_wrt)?;
|
||||||
let store_write = segment.open_write(SegmentComponent::STORE)?;
|
let fieldnorms_serializer = FieldNormsSerializer::from_write(writers.fieldnorms_wrt)?;
|
||||||
|
let postings_serializer = InvertedIndexSerializer::open(
|
||||||
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
|
schema,
|
||||||
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
|
writers.terms_wrt,
|
||||||
|
writers.postings_wrt,
|
||||||
let fieldnorms_write = segment.open_write(SegmentComponent::FIELDNORMS)?;
|
writers.positions_wrt,
|
||||||
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
|
writers.positions_skip_wrt,
|
||||||
|
);
|
||||||
let postings_serializer = InvertedIndexSerializer::open(segment)?;
|
|
||||||
Ok(SegmentSerializer {
|
Ok(SegmentSerializer {
|
||||||
store_writer: StoreWriter::new(store_write),
|
store_writer: StoreWriter::new(writers.store_wrt),
|
||||||
fast_field_serializer,
|
fast_field_serializer,
|
||||||
fieldnorms_serializer,
|
fieldnorms_serializer,
|
||||||
postings_serializer,
|
postings_serializer,
|
||||||
|
bundle_writer: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,11 +85,15 @@ impl SegmentSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Finalize the segment serialization.
|
/// Finalize the segment serialization.
|
||||||
pub fn close(self) -> Result<()> {
|
pub fn close(mut self) -> crate::Result<()> {
|
||||||
self.fast_field_serializer.close()?;
|
self.fast_field_serializer.close()?;
|
||||||
self.postings_serializer.close()?;
|
self.postings_serializer.close()?;
|
||||||
self.store_writer.close()?;
|
self.store_writer.close()?;
|
||||||
self.fieldnorms_serializer.close()?;
|
self.fieldnorms_serializer.close()?;
|
||||||
|
if let Some((ram_directory, mut bundle_wrt)) = self.bundle_writer.take() {
|
||||||
|
ram_directory.serialize_bundle(&mut bundle_wrt)?;
|
||||||
|
bundle_wrt.terminate()?;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ use crate::indexer::index_writer::advance_deletes;
|
|||||||
use crate::indexer::merge_operation::MergeOperationInventory;
|
use crate::indexer::merge_operation::MergeOperationInventory;
|
||||||
use crate::indexer::merger::IndexMerger;
|
use crate::indexer::merger::IndexMerger;
|
||||||
use crate::indexer::segment_manager::SegmentsStatus;
|
use crate::indexer::segment_manager::SegmentsStatus;
|
||||||
|
use crate::indexer::segment_serializer::SegmentSerializerWriters;
|
||||||
use crate::indexer::stamper::Stamper;
|
use crate::indexer::stamper::Stamper;
|
||||||
use crate::indexer::SegmentEntry;
|
use crate::indexer::SegmentEntry;
|
||||||
use crate::indexer::SegmentSerializer;
|
use crate::indexer::SegmentSerializer;
|
||||||
@@ -132,7 +133,9 @@ fn merge(
|
|||||||
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
|
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
|
||||||
|
|
||||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut merged_segment)?;
|
||||||
|
let segment_serializer =
|
||||||
|
SegmentSerializer::new(merged_segment.schema(), segment_serializer_wrts)?;
|
||||||
|
|
||||||
let num_docs = merger.write(segment_serializer)?;
|
let num_docs = merger.write(segment_serializer)?;
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use crate::core::Segment;
|
|||||||
use crate::core::SerializableSegment;
|
use crate::core::SerializableSegment;
|
||||||
use crate::fastfield::FastFieldsWriter;
|
use crate::fastfield::FastFieldsWriter;
|
||||||
use crate::fieldnorm::FieldNormsWriter;
|
use crate::fieldnorm::FieldNormsWriter;
|
||||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
use crate::indexer::segment_serializer::{SegmentSerializer, SegmentSerializerWriters};
|
||||||
use crate::postings::compute_table_size;
|
use crate::postings::compute_table_size;
|
||||||
use crate::postings::MultiFieldPostingsWriter;
|
use crate::postings::MultiFieldPostingsWriter;
|
||||||
use crate::schema::FieldType;
|
use crate::schema::FieldType;
|
||||||
@@ -69,7 +69,8 @@ impl SegmentWriter {
|
|||||||
schema: &Schema,
|
schema: &Schema,
|
||||||
) -> Result<SegmentWriter> {
|
) -> Result<SegmentWriter> {
|
||||||
let table_num_bits = initial_table_size(memory_budget)?;
|
let table_num_bits = initial_table_size(memory_budget)?;
|
||||||
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
|
let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut segment)?;
|
||||||
|
let segment_serializer = SegmentSerializer::new(segment.schema(), segment_serializer_wrts)?;
|
||||||
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
||||||
let tokenizers = schema
|
let tokenizers = schema
|
||||||
.fields()
|
.fields()
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ pub mod tests {
|
|||||||
let schema = schema_builder.build();
|
let schema = schema_builder.build();
|
||||||
let index = Index::create_in_ram(schema);
|
let index = Index::create_in_ram(schema);
|
||||||
let mut segment = index.new_segment();
|
let mut segment = index.new_segment();
|
||||||
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap();
|
let mut posting_serializer = InvertedIndexSerializer::for_segment(&mut segment).unwrap();
|
||||||
{
|
{
|
||||||
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap();
|
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap();
|
||||||
field_serializer.new_term("abc".as_bytes()).unwrap();
|
field_serializer.new_term("abc".as_bytes()).unwrap();
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ use crate::postings::USE_SKIP_INFO_LIMIT;
|
|||||||
use crate::schema::Schema;
|
use crate::schema::Schema;
|
||||||
use crate::schema::{Field, FieldEntry, FieldType};
|
use crate::schema::{Field, FieldEntry, FieldType};
|
||||||
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
|
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
|
||||||
use crate::DocId;
|
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
use crate::{Directory, DocId};
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
|
||||||
/// `InvertedIndexSerializer` is in charge of serializing
|
/// `InvertedIndexSerializer` is in charge of serializing
|
||||||
@@ -54,33 +54,36 @@ pub struct InvertedIndexSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl InvertedIndexSerializer {
|
impl InvertedIndexSerializer {
|
||||||
/// Open a new `InvertedIndexSerializer` for the given segment
|
pub(crate) fn for_segment(segment: &mut Segment) -> crate::Result<Self> {
|
||||||
fn create(
|
let schema = segment.schema();
|
||||||
terms_write: CompositeWrite<WritePtr>,
|
use crate::core::SegmentComponent;
|
||||||
postings_write: CompositeWrite<WritePtr>,
|
let terms_wrt = segment.open_write(SegmentComponent::TERMS)?;
|
||||||
positions_write: CompositeWrite<WritePtr>,
|
let postings_wrt = segment.open_write(SegmentComponent::POSTINGS)?;
|
||||||
positionsidx_write: CompositeWrite<WritePtr>,
|
let positions_wrt = segment.open_write(SegmentComponent::POSITIONS)?;
|
||||||
schema: Schema,
|
let positions_idx_wrt = segment.open_write(SegmentComponent::POSITIONSSKIP)?;
|
||||||
) -> Result<InvertedIndexSerializer> {
|
Ok(Self::open(
|
||||||
Ok(InvertedIndexSerializer {
|
|
||||||
terms_write,
|
|
||||||
postings_write,
|
|
||||||
positions_write,
|
|
||||||
positionsidx_write,
|
|
||||||
schema,
|
schema,
|
||||||
})
|
terms_wrt,
|
||||||
|
postings_wrt,
|
||||||
|
positions_wrt,
|
||||||
|
positions_idx_wrt,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Open a new `PostingsSerializer` for the given segment
|
/// Open a new `PostingsSerializer` for the given segment
|
||||||
pub fn open(segment: &mut Segment) -> Result<InvertedIndexSerializer> {
|
pub(crate) fn open(
|
||||||
use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
|
schema: Schema,
|
||||||
InvertedIndexSerializer::create(
|
terms_wrt: WritePtr,
|
||||||
CompositeWrite::wrap(segment.open_write(TERMS)?),
|
postings_wrt: WritePtr,
|
||||||
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
|
positions_wrt: WritePtr,
|
||||||
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
|
positions_idx_wrt: WritePtr,
|
||||||
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
|
) -> InvertedIndexSerializer {
|
||||||
segment.schema(),
|
InvertedIndexSerializer {
|
||||||
)
|
terms_write: CompositeWrite::wrap(terms_wrt),
|
||||||
|
postings_write: CompositeWrite::wrap(postings_wrt),
|
||||||
|
positions_write: CompositeWrite::wrap(positions_wrt),
|
||||||
|
positionsidx_write: CompositeWrite::wrap(positions_idx_wrt),
|
||||||
|
schema,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Must be called before starting pushing terms of
|
/// Must be called before starting pushing terms of
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ use self::compression_snap::{compress, decompress};
|
|||||||
pub mod tests {
|
pub mod tests {
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::directory::{Directory, RAMDirectory, WritePtr};
|
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
|
||||||
use crate::schema::Document;
|
use crate::schema::Document;
|
||||||
use crate::schema::FieldValue;
|
use crate::schema::FieldValue;
|
||||||
use crate::schema::Schema;
|
use crate::schema::Schema;
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||||
use crate::core::Index;
|
use crate::core::Index;
|
||||||
use crate::directory::{Directory, RAMDirectory, ReadOnlySource};
|
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, ReadOnlySource};
|
||||||
use crate::postings::TermInfo;
|
use crate::postings::TermInfo;
|
||||||
use crate::schema::{Document, FieldType, Schema, TEXT};
|
use crate::schema::{Document, FieldType, Schema, TEXT};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
use fail;
|
use fail;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
|
use tantivy::directory::{
|
||||||
|
Directory, ManagedDirectory, RAMDirectory, ReadOnlyDirectory, TerminatingWrite,
|
||||||
|
};
|
||||||
use tantivy::doc;
|
use tantivy::doc;
|
||||||
use tantivy::schema::{Schema, TEXT};
|
use tantivy::schema::{Schema, TEXT};
|
||||||
use tantivy::{Index, Term};
|
use tantivy::{Index, Term};
|
||||||
|
|||||||
Reference in New Issue
Block a user