mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
add checksum check in ManagedDirectory (#605)
* add checksum check in ManagedDirectory fix #400 * flush after writing checksum * don't checksum atomic file access and clone managed_paths * implement a footer storing metadata about a file this is more of a poc, it require some refactoring into multiple files `terminate(self)` is implemented, but not used anywhere yet * address comments and simplify things with new contract use BitOrder for integer to raw byte conversion consider atomic write imply atomic read, which might not actually be true use some indirection to have a boxable terminating writer * implement TerminatingWrite and make terminate() be called where it should add dependancy to drop_bomb to help find where terminate() should be called implement TerminatingWrite for wrapper writers make tests pass /!\ some tests seems to pass where they shouldn't * remove usage of drop_bomb * fmt * add test for checksum * address some review comments * update changelog * fmt
This commit is contained in:
committed by
Paul Masurel
parent
7e08e0047b
commit
d8894f0bd2
@@ -8,6 +8,7 @@ Tantivy 0.11.0
|
||||
- Closes #498 - add support for Elastic-style unbounded range queries for alphanumeric types eg. "title:>hello", "weight:>=70.5", "height:<200" (@petr-tik)
|
||||
- API change around `Box<BoxableTokenizer>`. See detail in #629
|
||||
- Avoid rebuilding Regex automaton whenever a regex query is reused. #630 (@brainlock)
|
||||
- Add footer with some metadata to index files. #605 (@fdb-hiroshima)
|
||||
|
||||
## How to update?
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ edition = "2018"
|
||||
[dependencies]
|
||||
base64 = "0.10.0"
|
||||
byteorder = "1.0"
|
||||
crc32fast = "1.2.0"
|
||||
once_cell = "1.0"
|
||||
regex ={version = "1.3.0", default-features = false, features = ["std"]}
|
||||
tantivy-fst = "0.1"
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::common::BinarySerializable;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::common::VInt;
|
||||
use crate::directory::ReadOnlySource;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use crate::schema::Field;
|
||||
use crate::space_usage::FieldUsage;
|
||||
use crate::space_usage::PerFieldSpaceUsage;
|
||||
@@ -42,7 +42,7 @@ pub struct CompositeWrite<W = WritePtr> {
|
||||
offsets: HashMap<FileAddr, u64>,
|
||||
}
|
||||
|
||||
impl<W: Write> CompositeWrite<W> {
|
||||
impl<W: TerminatingWrite + Write> CompositeWrite<W> {
|
||||
/// Crate a new API writer that writes a composite file
|
||||
/// in a given write.
|
||||
pub fn wrap(w: W) -> CompositeWrite<W> {
|
||||
@@ -91,8 +91,7 @@ impl<W: Write> CompositeWrite<W> {
|
||||
|
||||
let footer_len = (self.write.written_bytes() - footer_offset) as u32;
|
||||
footer_len.serialize(&mut self.write)?;
|
||||
self.write.flush()?;
|
||||
Ok(())
|
||||
self.write.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::TerminatingWrite;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
@@ -42,6 +44,13 @@ impl<W: Write> Write for CountingWriter<W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.underlying.terminate_ref(token)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
|
||||
@@ -26,9 +26,10 @@ use crate::IndexWriter;
|
||||
use crate::Result;
|
||||
use num_cpus;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
#[cfg(feature = "mmap")]
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn load_metas(directory: &dyn Directory, inventory: &SegmentMetaInventory) -> Result<IndexMeta> {
|
||||
@@ -368,6 +369,11 @@ impl Index {
|
||||
.map(SegmentMeta::id)
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Returns the set of corrupted files
|
||||
pub fn validate_checksum(&self) -> Result<HashSet<PathBuf>> {
|
||||
self.directory.list_damaged().map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Index {
|
||||
|
||||
@@ -118,6 +118,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
///
|
||||
/// Specifically, subsequent writes or flushes should
|
||||
/// have no effect on the returned `ReadOnlySource` object.
|
||||
///
|
||||
/// You should only use this to read files create with [`open_write`]
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
|
||||
|
||||
/// Removes a file
|
||||
@@ -157,6 +159,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// atomic_write.
|
||||
///
|
||||
/// This should only be used for small files.
|
||||
///
|
||||
/// You should only use this to read files create with [`atomic_write`]
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
|
||||
|
||||
/// Atomically replace the content of a file with data.
|
||||
|
||||
213
src/directory/footer.rs
Normal file
213
src/directory/footer.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
use crate::directory::read_only_source::ReadOnlySource;
|
||||
use crate::directory::{AntiCallToken, TerminatingWrite};
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use crc32fast::Hasher;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
const COMMON_FOOTER_SIZE: usize = 4 * 5;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Footer {
|
||||
pub tantivy_version: (u32, u32, u32),
|
||||
pub meta: String,
|
||||
pub versioned_footer: VersionedFooter,
|
||||
}
|
||||
|
||||
impl Footer {
|
||||
pub fn new(versioned_footer: VersionedFooter) -> Self {
|
||||
let tantivy_version = (
|
||||
env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
|
||||
env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
|
||||
env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
|
||||
);
|
||||
Footer {
|
||||
tantivy_version,
|
||||
meta: format!(
|
||||
"tantivy {}.{}.{}, index v{}",
|
||||
tantivy_version.0,
|
||||
tantivy_version.1,
|
||||
tantivy_version.2,
|
||||
versioned_footer.version()
|
||||
),
|
||||
versioned_footer,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> Vec<u8> {
|
||||
let mut res = self.versioned_footer.to_bytes();
|
||||
res.extend_from_slice(self.meta.as_bytes());
|
||||
let len = res.len();
|
||||
res.resize(len + COMMON_FOOTER_SIZE, 0);
|
||||
let mut common_footer = &mut res[len..];
|
||||
LittleEndian::write_u32(&mut common_footer, self.meta.len() as u32);
|
||||
LittleEndian::write_u32(&mut common_footer[4..], self.tantivy_version.0);
|
||||
LittleEndian::write_u32(&mut common_footer[8..], self.tantivy_version.1);
|
||||
LittleEndian::write_u32(&mut common_footer[12..], self.tantivy_version.2);
|
||||
LittleEndian::write_u32(&mut common_footer[16..], (len + COMMON_FOOTER_SIZE) as u32);
|
||||
res
|
||||
}
|
||||
|
||||
pub fn from_bytes(data: &[u8]) -> Result<Self, io::Error> {
|
||||
let len = data.len();
|
||||
if len < COMMON_FOOTER_SIZE + 4 {
|
||||
// 4 bytes for index version, stored in versioned footer
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
format!("File corrupted. The footer len must be over 24, while the entire file len is {}", len)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
let size = LittleEndian::read_u32(&data[len - 4..]) as usize;
|
||||
if len < size as usize {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
format!(
|
||||
"File corrupted. The footer len is {}, while the entire file len is {}",
|
||||
size, len
|
||||
),
|
||||
));
|
||||
}
|
||||
let footer = &data[len - size as usize..];
|
||||
let meta_len = LittleEndian::read_u32(&footer[size - 20..]) as usize;
|
||||
let tantivy_major = LittleEndian::read_u32(&footer[size - 16..]);
|
||||
let tantivy_minor = LittleEndian::read_u32(&footer[size - 12..]);
|
||||
let tantivy_patch = LittleEndian::read_u32(&footer[size - 8..]);
|
||||
Ok(Footer {
|
||||
tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch),
|
||||
meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(),
|
||||
versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn extract_footer(source: ReadOnlySource) -> Result<(Footer, ReadOnlySource), io::Error> {
|
||||
let footer = Footer::from_bytes(source.as_slice())?;
|
||||
let reader = source.slice_to(source.as_slice().len() - footer.size());
|
||||
Ok((footer, reader))
|
||||
}
|
||||
|
||||
pub fn size(&self) -> usize {
|
||||
self.versioned_footer.size() as usize + self.meta.len() + 20
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum VersionedFooter {
|
||||
UnknownVersion { version: u32, size: u32 },
|
||||
V0(u32), // crc
|
||||
}
|
||||
|
||||
impl VersionedFooter {
|
||||
pub fn to_bytes(&self) -> Vec<u8> {
|
||||
match self {
|
||||
Self::V0(crc) => {
|
||||
let mut res = vec![0; 8];
|
||||
LittleEndian::write_u32(&mut res, 0);
|
||||
LittleEndian::write_u32(&mut res[4..], *crc);
|
||||
res
|
||||
}
|
||||
Self::UnknownVersion { .. } => {
|
||||
panic!("Unsupported index should never get serialized");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_bytes(footer: &[u8]) -> Result<Self, io::Error> {
|
||||
assert!(footer.len() >= 4);
|
||||
let version = LittleEndian::read_u32(footer);
|
||||
match version {
|
||||
0 => {
|
||||
if footer.len() == 8 {
|
||||
Ok(Self::V0(LittleEndian::read_u32(&footer[4..])))
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
format!(
|
||||
"File corrupted. The versioned footer len is {}, while it should be 8",
|
||||
footer.len()
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
version => Ok(Self::UnknownVersion {
|
||||
version,
|
||||
size: footer.len() as u32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u32 {
|
||||
match self {
|
||||
Self::V0(_) => 8,
|
||||
Self::UnknownVersion { size, .. } => *size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn version(&self) -> u32 {
|
||||
match self {
|
||||
Self::V0(_) => 0,
|
||||
Self::UnknownVersion { version, .. } => *version,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn crc(&self) -> Option<u32> {
|
||||
match self {
|
||||
Self::V0(crc) => Some(*crc),
|
||||
Self::UnknownVersion { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct FooterProxy<W: TerminatingWrite> {
|
||||
/// always Some except after terminate call
|
||||
hasher: Option<Hasher>,
|
||||
/// always Some except after terminate call
|
||||
writer: Option<W>,
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> FooterProxy<W> {
|
||||
pub fn new(writer: W) -> Self {
|
||||
FooterProxy {
|
||||
hasher: Some(Hasher::new()),
|
||||
writer: Some(writer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> Write for FooterProxy<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let count = self.writer.as_mut().unwrap().write(buf)?;
|
||||
self.hasher.as_mut().unwrap().update(&buf[..count]);
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.writer.as_mut().unwrap().flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
||||
let crc = self.hasher.take().unwrap().finalize();
|
||||
|
||||
let footer = Footer::new(VersionedFooter::V0(crc)).to_bytes();
|
||||
let mut writer = self.writer.take().unwrap();
|
||||
writer.write_all(&footer)?;
|
||||
writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::directory::footer::{Footer, VersionedFooter};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_deserialize_footer() {
|
||||
let crc = 123456;
|
||||
let footer = Footer::new(VersionedFooter::V0(crc));
|
||||
let footer_bytes = footer.to_bytes();
|
||||
|
||||
assert_eq!(Footer::from_bytes(&footer_bytes).unwrap(), footer);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::core::MANAGED_FILEPATH;
|
||||
use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::META_LOCK;
|
||||
@@ -8,6 +9,7 @@ use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
use crate::Result;
|
||||
use crc32fast::Hasher;
|
||||
use serde_json;
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
@@ -207,17 +209,59 @@ impl ManagedDirectory {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verify checksum of a managed file
|
||||
pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
|
||||
let reader = self.directory.open_read(path)?;
|
||||
let (footer, data) = Footer::extract_footer(reader)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
let mut hasher = Hasher::new();
|
||||
hasher.update(data.as_slice());
|
||||
let crc = hasher.finalize();
|
||||
Ok(footer
|
||||
.versioned_footer
|
||||
.crc()
|
||||
.map(|v| v == crc)
|
||||
.unwrap_or(false))
|
||||
}
|
||||
|
||||
/// List files for which checksum does not match content
|
||||
pub fn list_damaged(&self) -> result::Result<HashSet<PathBuf>, OpenReadError> {
|
||||
let mut hashset = HashSet::new();
|
||||
let managed_paths = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("Managed directory rlock poisoned in list damaged.")
|
||||
.managed_paths
|
||||
.clone();
|
||||
|
||||
for path in managed_paths.into_iter() {
|
||||
if !self.validate_checksum(&path)? {
|
||||
hashset.insert(path);
|
||||
}
|
||||
}
|
||||
Ok(hashset)
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
self.directory.open_read(path)
|
||||
let read_only_source = self.directory.open_read(path)?;
|
||||
let (_footer, reader) = Footer::extract_footer(read_only_source)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
||||
self.register_file_as_managed(path)
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
self.directory.open_write(path)
|
||||
Ok(io::BufWriter::new(Box::new(FooterProxy::new(
|
||||
self.directory
|
||||
.open_write(path)?
|
||||
.into_inner()
|
||||
.map_err(|_| ())
|
||||
.expect("buffer should be empty"),
|
||||
))))
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
@@ -259,8 +303,9 @@ impl Clone for ManagedDirectory {
|
||||
#[cfg(test)]
|
||||
mod tests_mmap_specific {
|
||||
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory};
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
|
||||
use std::collections::HashSet;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tempfile::TempDir;
|
||||
@@ -275,8 +320,8 @@ mod tests_mmap_specific {
|
||||
{
|
||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||
let mut write_file = managed_directory.open_write(test_path1).unwrap();
|
||||
write_file.flush().unwrap();
|
||||
let write_file = managed_directory.open_write(test_path1).unwrap();
|
||||
write_file.terminate().unwrap();
|
||||
managed_directory
|
||||
.atomic_write(test_path2, &[0u8, 1u8])
|
||||
.unwrap();
|
||||
@@ -310,9 +355,9 @@ mod tests_mmap_specific {
|
||||
|
||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||
managed_directory
|
||||
.atomic_write(test_path1, &vec![0u8, 1u8])
|
||||
.unwrap();
|
||||
let mut write = managed_directory.open_write(test_path1).unwrap();
|
||||
write.write_all(&[0u8, 1u8]).unwrap();
|
||||
write.terminate().unwrap();
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
|
||||
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
|
||||
@@ -331,4 +376,38 @@ mod tests_mmap_specific {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_checksum() {
|
||||
let test_path1: &'static Path = Path::new("some_path_for_test");
|
||||
let test_path2: &'static Path = Path::new("other_test_path");
|
||||
|
||||
let tempdir = TempDir::new().unwrap();
|
||||
let tempdir_path = PathBuf::from(tempdir.path());
|
||||
|
||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||
let mut write = managed_directory.open_write(test_path1).unwrap();
|
||||
write.write_all(&[0u8, 1u8]).unwrap();
|
||||
write.terminate().unwrap();
|
||||
|
||||
let mut write = managed_directory.open_write(test_path2).unwrap();
|
||||
write.write_all(&[3u8, 4u8, 5u8]).unwrap();
|
||||
write.terminate().unwrap();
|
||||
|
||||
assert!(managed_directory.list_damaged().unwrap().is_empty());
|
||||
|
||||
let mut corrupted_path = tempdir_path.clone();
|
||||
corrupted_path.push(test_path2);
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
.open(&corrupted_path)
|
||||
.unwrap();
|
||||
file.write_all(&[255u8]).unwrap();
|
||||
file.flush().unwrap();
|
||||
drop(file);
|
||||
|
||||
let damaged = managed_directory.list_damaged().unwrap();
|
||||
assert_eq!(damaged.len(), 1);
|
||||
assert!(damaged.contains(test_path2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::directory::error::{
|
||||
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||
};
|
||||
use crate::directory::read_only_source::BoxedData;
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::Lock;
|
||||
@@ -18,7 +19,7 @@ use crate::directory::ReadOnlySource;
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchCallbackList;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use atomicwrites;
|
||||
use memmap::Mmap;
|
||||
use std::collections::HashMap;
|
||||
@@ -412,6 +413,12 @@ impl Seek for SafeFileWriter {
|
||||
}
|
||||
}
|
||||
|
||||
impl TerminatingWrite for SafeFileWriter {
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
|
||||
@@ -9,6 +9,7 @@ mod mmap_directory;
|
||||
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod footer;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod read_only_source;
|
||||
@@ -24,18 +25,49 @@ pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub(crate) use self::watch_event_router::WatchCallbackList;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchHandle};
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
pub use self::mmap_directory::MmapDirectory;
|
||||
|
||||
pub use self::managed_directory::ManagedDirectory;
|
||||
|
||||
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
|
||||
pub struct AntiCallToken(());
|
||||
|
||||
/// Trait used to indicate when no more write need to be done on a writer
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
self.terminate_ref(AntiCallToken(()))
|
||||
}
|
||||
|
||||
/// You should implement this function to define custom behavior.
|
||||
/// This function should flush any buffer it may hold.
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.as_mut().terminate_ref(token)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
||||
fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.get_mut().terminate_ref(a)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write object for Directory.
|
||||
///
|
||||
/// `WritePtr` are required to implement both Write
|
||||
/// and Seek.
|
||||
pub type WritePtr = BufWriter<Box<dyn Write>>;
|
||||
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::WatchCallbackList;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fail::fail_point;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
@@ -71,6 +72,12 @@ impl Write for VecWriter {
|
||||
}
|
||||
}
|
||||
|
||||
impl TerminatingWrite for VecWriter {
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct InnerDirectory {
|
||||
fs: HashMap<PathBuf, ReadOnlySource>,
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::docset::DocSet;
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_delete_bitset;
|
||||
@@ -168,6 +169,7 @@ pub(crate) fn advance_deletes(
|
||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
||||
delete_file.terminate()?;
|
||||
}
|
||||
}
|
||||
segment_entry.set_meta(segment.meta().clone());
|
||||
|
||||
@@ -3,6 +3,7 @@ use super::skiplist::SkipListBuilder;
|
||||
use super::StoreReader;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Document;
|
||||
use crate::DocId;
|
||||
@@ -109,6 +110,6 @@ impl StoreWriter {
|
||||
self.offset_index_writer.write(&mut self.writer)?;
|
||||
header_offset.serialize(&mut self.writer)?;
|
||||
self.doc.serialize(&mut self.writer)?;
|
||||
self.writer.flush()
|
||||
self.writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use fail;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory};
|
||||
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
|
||||
use tantivy::doc;
|
||||
use tantivy::schema::{Schema, TEXT};
|
||||
use tantivy::{Index, Term};
|
||||
@@ -17,7 +17,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
|
||||
managed_directory
|
||||
.open_write(test_path)
|
||||
.unwrap()
|
||||
.flush()
|
||||
.terminate()
|
||||
.unwrap();
|
||||
assert!(managed_directory.exists(test_path));
|
||||
// triggering gc and setting the delete operation to fail.
|
||||
|
||||
Reference in New Issue
Block a user