diff --git a/CHANGELOG.md b/CHANGELOG.md index fc94b203b..cf07ff4ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Tantivy 0.11.0 - Closes #498 - add support for Elastic-style unbounded range queries for alphanumeric types eg. "title:>hello", "weight:>=70.5", "height:<200" (@petr-tik) - API change around `Box`. See detail in #629 - Avoid rebuilding Regex automaton whenever a regex query is reused. #630 (@brainlock) +- Add footer with some metadata to index files. #605 (@fdb-hiroshima) ## How to update? diff --git a/Cargo.toml b/Cargo.toml index e05fafa6c..6528cf63d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ edition = "2018" [dependencies] base64 = "0.10.0" byteorder = "1.0" +crc32fast = "1.2.0" once_cell = "1.0" regex ={version = "1.3.0", default-features = false, features = ["std"]} tantivy-fst = "0.1" diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index f2c2d2208..f2d2926f4 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -2,7 +2,7 @@ use crate::common::BinarySerializable; use crate::common::CountingWriter; use crate::common::VInt; use crate::directory::ReadOnlySource; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use crate::schema::Field; use crate::space_usage::FieldUsage; use crate::space_usage::PerFieldSpaceUsage; @@ -42,7 +42,7 @@ pub struct CompositeWrite { offsets: HashMap, } -impl CompositeWrite { +impl CompositeWrite { /// Crate a new API writer that writes a composite file /// in a given write. pub fn wrap(w: W) -> CompositeWrite { @@ -91,8 +91,7 @@ impl CompositeWrite { let footer_len = (self.write.written_bytes() - footer_offset) as u32; footer_len.serialize(&mut self.write)?; - self.write.flush()?; - Ok(()) + self.write.terminate() } } diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs index 339c60bec..8293ba8b3 100644 --- a/src/common/counting_writer.rs +++ b/src/common/counting_writer.rs @@ -1,3 +1,5 @@ +use crate::directory::AntiCallToken; +use crate::directory::TerminatingWrite; use std::io; use std::io::Write; @@ -42,6 +44,13 @@ impl Write for CountingWriter { } } +impl TerminatingWrite for CountingWriter { + fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { + self.flush()?; + self.underlying.terminate_ref(token) + } +} + #[cfg(test)] mod test { diff --git a/src/core/index.rs b/src/core/index.rs index 50d184df2..ac0bea361 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -26,9 +26,10 @@ use crate::IndexWriter; use crate::Result; use num_cpus; use std::borrow::BorrowMut; +use std::collections::HashSet; use std::fmt; #[cfg(feature = "mmap")] -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; fn load_metas(directory: &dyn Directory, inventory: &SegmentMetaInventory) -> Result { @@ -368,6 +369,11 @@ impl Index { .map(SegmentMeta::id) .collect()) } + + /// Returns the set of corrupted files + pub fn validate_checksum(&self) -> Result> { + self.directory.list_damaged().map_err(Into::into) + } } impl fmt::Debug for Index { diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 9da1cb217..6b5092542 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -118,6 +118,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// /// Specifically, subsequent writes or flushes should /// have no effect on the returned `ReadOnlySource` object. + /// + /// You should only use this to read files create with [`open_write`] fn open_read(&self, path: &Path) -> result::Result; /// Removes a file @@ -157,6 +159,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// atomic_write. /// /// This should only be used for small files. + /// + /// You should only use this to read files create with [`atomic_write`] fn atomic_read(&self, path: &Path) -> Result, OpenReadError>; /// Atomically replace the content of a file with data. diff --git a/src/directory/footer.rs b/src/directory/footer.rs new file mode 100644 index 000000000..bc4601eaa --- /dev/null +++ b/src/directory/footer.rs @@ -0,0 +1,213 @@ +use crate::directory::read_only_source::ReadOnlySource; +use crate::directory::{AntiCallToken, TerminatingWrite}; +use byteorder::{ByteOrder, LittleEndian}; +use crc32fast::Hasher; +use std::io; +use std::io::Write; + +const COMMON_FOOTER_SIZE: usize = 4 * 5; + +#[derive(Debug, Clone, PartialEq)] +pub struct Footer { + pub tantivy_version: (u32, u32, u32), + pub meta: String, + pub versioned_footer: VersionedFooter, +} + +impl Footer { + pub fn new(versioned_footer: VersionedFooter) -> Self { + let tantivy_version = ( + env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), + ); + Footer { + tantivy_version, + meta: format!( + "tantivy {}.{}.{}, index v{}", + tantivy_version.0, + tantivy_version.1, + tantivy_version.2, + versioned_footer.version() + ), + versioned_footer, + } + } + + pub fn to_bytes(&self) -> Vec { + let mut res = self.versioned_footer.to_bytes(); + res.extend_from_slice(self.meta.as_bytes()); + let len = res.len(); + res.resize(len + COMMON_FOOTER_SIZE, 0); + let mut common_footer = &mut res[len..]; + LittleEndian::write_u32(&mut common_footer, self.meta.len() as u32); + LittleEndian::write_u32(&mut common_footer[4..], self.tantivy_version.0); + LittleEndian::write_u32(&mut common_footer[8..], self.tantivy_version.1); + LittleEndian::write_u32(&mut common_footer[12..], self.tantivy_version.2); + LittleEndian::write_u32(&mut common_footer[16..], (len + COMMON_FOOTER_SIZE) as u32); + res + } + + pub fn from_bytes(data: &[u8]) -> Result { + let len = data.len(); + if len < COMMON_FOOTER_SIZE + 4 { + // 4 bytes for index version, stored in versioned footer + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("File corrupted. The footer len must be over 24, while the entire file len is {}", len) + ) + ); + } + + let size = LittleEndian::read_u32(&data[len - 4..]) as usize; + if len < size as usize { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The footer len is {}, while the entire file len is {}", + size, len + ), + )); + } + let footer = &data[len - size as usize..]; + let meta_len = LittleEndian::read_u32(&footer[size - 20..]) as usize; + let tantivy_major = LittleEndian::read_u32(&footer[size - 16..]); + let tantivy_minor = LittleEndian::read_u32(&footer[size - 12..]); + let tantivy_patch = LittleEndian::read_u32(&footer[size - 8..]); + Ok(Footer { + tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch), + meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(), + versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])?, + }) + } + + pub fn extract_footer(source: ReadOnlySource) -> Result<(Footer, ReadOnlySource), io::Error> { + let footer = Footer::from_bytes(source.as_slice())?; + let reader = source.slice_to(source.as_slice().len() - footer.size()); + Ok((footer, reader)) + } + + pub fn size(&self) -> usize { + self.versioned_footer.size() as usize + self.meta.len() + 20 + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum VersionedFooter { + UnknownVersion { version: u32, size: u32 }, + V0(u32), // crc +} + +impl VersionedFooter { + pub fn to_bytes(&self) -> Vec { + match self { + Self::V0(crc) => { + let mut res = vec![0; 8]; + LittleEndian::write_u32(&mut res, 0); + LittleEndian::write_u32(&mut res[4..], *crc); + res + } + Self::UnknownVersion { .. } => { + panic!("Unsupported index should never get serialized"); + } + } + } + + pub fn from_bytes(footer: &[u8]) -> Result { + assert!(footer.len() >= 4); + let version = LittleEndian::read_u32(footer); + match version { + 0 => { + if footer.len() == 8 { + Ok(Self::V0(LittleEndian::read_u32(&footer[4..]))) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The versioned footer len is {}, while it should be 8", + footer.len() + ), + )) + } + } + version => Ok(Self::UnknownVersion { + version, + size: footer.len() as u32, + }), + } + } + + pub fn size(&self) -> u32 { + match self { + Self::V0(_) => 8, + Self::UnknownVersion { size, .. } => *size, + } + } + + pub fn version(&self) -> u32 { + match self { + Self::V0(_) => 0, + Self::UnknownVersion { version, .. } => *version, + } + } + + pub fn crc(&self) -> Option { + match self { + Self::V0(crc) => Some(*crc), + Self::UnknownVersion { .. } => None, + } + } +} + +pub(crate) struct FooterProxy { + /// always Some except after terminate call + hasher: Option, + /// always Some except after terminate call + writer: Option, +} + +impl FooterProxy { + pub fn new(writer: W) -> Self { + FooterProxy { + hasher: Some(Hasher::new()), + writer: Some(writer), + } + } +} + +impl Write for FooterProxy { + fn write(&mut self, buf: &[u8]) -> io::Result { + let count = self.writer.as_mut().unwrap().write(buf)?; + self.hasher.as_mut().unwrap().update(&buf[..count]); + Ok(count) + } + + fn flush(&mut self) -> io::Result<()> { + self.writer.as_mut().unwrap().flush() + } +} + +impl TerminatingWrite for FooterProxy { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + let crc = self.hasher.take().unwrap().finalize(); + + let footer = Footer::new(VersionedFooter::V0(crc)).to_bytes(); + let mut writer = self.writer.take().unwrap(); + writer.write_all(&footer)?; + writer.terminate() + } +} + +#[cfg(test)] +mod tests { + use crate::directory::footer::{Footer, VersionedFooter}; + + #[test] + fn test_serialize_deserialize_footer() { + let crc = 123456; + let footer = Footer::new(VersionedFooter::V0(crc)); + let footer_bytes = footer.to_bytes(); + + assert_eq!(Footer::from_bytes(&footer_bytes).unwrap(), footer); + } +} diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 859e66d51..f72668fa1 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,5 +1,6 @@ use crate::core::MANAGED_FILEPATH; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; +use crate::directory::footer::{Footer, FooterProxy}; use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::META_LOCK; @@ -8,6 +9,7 @@ use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; use crate::Directory; use crate::Result; +use crc32fast::Hasher; use serde_json; use std::collections::HashSet; use std::io; @@ -207,17 +209,59 @@ impl ManagedDirectory { } Ok(()) } + + /// Verify checksum of a managed file + pub fn validate_checksum(&self, path: &Path) -> result::Result { + let reader = self.directory.open_read(path)?; + let (footer, data) = Footer::extract_footer(reader) + .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; + let mut hasher = Hasher::new(); + hasher.update(data.as_slice()); + let crc = hasher.finalize(); + Ok(footer + .versioned_footer + .crc() + .map(|v| v == crc) + .unwrap_or(false)) + } + + /// List files for which checksum does not match content + pub fn list_damaged(&self) -> result::Result, OpenReadError> { + let mut hashset = HashSet::new(); + let managed_paths = self + .meta_informations + .read() + .expect("Managed directory rlock poisoned in list damaged.") + .managed_paths + .clone(); + + for path in managed_paths.into_iter() { + if !self.validate_checksum(&path)? { + hashset.insert(path); + } + } + Ok(hashset) + } } impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.directory.open_read(path) + let read_only_source = self.directory.open_read(path)?; + let (_footer, reader) = Footer::extract_footer(read_only_source) + .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; + Ok(reader) } fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - self.directory.open_write(path) + Ok(io::BufWriter::new(Box::new(FooterProxy::new( + self.directory + .open_write(path)? + .into_inner() + .map_err(|_| ()) + .expect("buffer should be empty"), + )))) } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { @@ -259,8 +303,9 @@ impl Clone for ManagedDirectory { #[cfg(test)] mod tests_mmap_specific { - use crate::directory::{Directory, ManagedDirectory, MmapDirectory}; + use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite}; use std::collections::HashSet; + use std::fs::OpenOptions; use std::io::Write; use std::path::{Path, PathBuf}; use tempfile::TempDir; @@ -275,8 +320,8 @@ mod tests_mmap_specific { { let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); - let mut write_file = managed_directory.open_write(test_path1).unwrap(); - write_file.flush().unwrap(); + let write_file = managed_directory.open_write(test_path1).unwrap(); + write_file.terminate().unwrap(); managed_directory .atomic_write(test_path2, &[0u8, 1u8]) .unwrap(); @@ -310,9 +355,9 @@ mod tests_mmap_specific { let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); - managed_directory - .atomic_write(test_path1, &vec![0u8, 1u8]) - .unwrap(); + let mut write = managed_directory.open_write(test_path1).unwrap(); + write.write_all(&[0u8, 1u8]).unwrap(); + write.terminate().unwrap(); assert!(managed_directory.exists(test_path1)); let _mmap_read = managed_directory.open_read(test_path1).unwrap(); @@ -331,4 +376,38 @@ mod tests_mmap_specific { } } + #[test] + fn test_checksum() { + let test_path1: &'static Path = Path::new("some_path_for_test"); + let test_path2: &'static Path = Path::new("other_test_path"); + + let tempdir = TempDir::new().unwrap(); + let tempdir_path = PathBuf::from(tempdir.path()); + + let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); + let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); + let mut write = managed_directory.open_write(test_path1).unwrap(); + write.write_all(&[0u8, 1u8]).unwrap(); + write.terminate().unwrap(); + + let mut write = managed_directory.open_write(test_path2).unwrap(); + write.write_all(&[3u8, 4u8, 5u8]).unwrap(); + write.terminate().unwrap(); + + assert!(managed_directory.list_damaged().unwrap().is_empty()); + + let mut corrupted_path = tempdir_path.clone(); + corrupted_path.push(test_path2); + let mut file = OpenOptions::new() + .write(true) + .open(&corrupted_path) + .unwrap(); + file.write_all(&[255u8]).unwrap(); + file.flush().unwrap(); + drop(file); + + let damaged = managed_directory.list_damaged().unwrap(); + assert_eq!(damaged.len(), 1); + assert!(damaged.contains(test_path2)); + } } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index cfb1e873d..fc3898cb2 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -11,6 +11,7 @@ use crate::directory::error::{ DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError, }; use crate::directory::read_only_source::BoxedData; +use crate::directory::AntiCallToken; use crate::directory::Directory; use crate::directory::DirectoryLock; use crate::directory::Lock; @@ -18,7 +19,7 @@ use crate::directory::ReadOnlySource; use crate::directory::WatchCallback; use crate::directory::WatchCallbackList; use crate::directory::WatchHandle; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use atomicwrites; use memmap::Mmap; use std::collections::HashMap; @@ -412,6 +413,12 @@ impl Seek for SafeFileWriter { } } +impl TerminatingWrite for SafeFileWriter { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.flush() + } +} + impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { debug!("Open Read {:?}", path); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 70fa01348..294beb9f0 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -9,6 +9,7 @@ mod mmap_directory; mod directory; mod directory_lock; +mod footer; mod managed_directory; mod ram_directory; mod read_only_source; @@ -24,18 +25,49 @@ pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; pub(crate) use self::watch_event_router::WatchCallbackList; pub use self::watch_event_router::{WatchCallback, WatchHandle}; -use std::io::{BufWriter, Write}; +use std::io::{self, BufWriter, Write}; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; +/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly +pub struct AntiCallToken(()); + +/// Trait used to indicate when no more write need to be done on a writer +pub trait TerminatingWrite: Write { + /// Indicate that the writer will no longer be used. Internally call terminate_ref. + fn terminate(mut self) -> io::Result<()> + where + Self: Sized, + { + self.terminate_ref(AntiCallToken(())) + } + + /// You should implement this function to define custom behavior. + /// This function should flush any buffer it may hold. + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>; +} + +impl TerminatingWrite for Box { + fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { + self.as_mut().terminate_ref(token) + } +} + +impl TerminatingWrite for BufWriter { + fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> { + self.flush()?; + self.get_mut().terminate_ref(a) + } +} + /// Write object for Directory. /// /// `WritePtr` are required to implement both Write /// and Seek. -pub type WritePtr = BufWriter>; +pub type WritePtr = BufWriter>; #[cfg(test)] mod tests; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 8c6d237dc..db19f9811 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,8 +1,9 @@ use crate::core::META_FILEPATH; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; +use crate::directory::AntiCallToken; use crate::directory::WatchCallbackList; -use crate::directory::WritePtr; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; +use crate::directory::{TerminatingWrite, WritePtr}; use fail::fail_point; use std::collections::HashMap; use std::fmt; @@ -71,6 +72,12 @@ impl Write for VecWriter { } } +impl TerminatingWrite for VecWriter { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.flush() + } +} + #[derive(Default)] struct InnerDirectory { fs: HashMap, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 917b90fab..bf5c4365d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -8,6 +8,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; use crate::directory::DirectoryLock; +use crate::directory::TerminatingWrite; use crate::docset::DocSet; use crate::error::TantivyError; use crate::fastfield::write_delete_bitset; @@ -168,6 +169,7 @@ pub(crate) fn advance_deletes( segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; + delete_file.terminate()?; } } segment_entry.set_meta(segment.meta().clone()); diff --git a/src/store/writer.rs b/src/store/writer.rs index bcb74f99d..5ddda2c7f 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -3,6 +3,7 @@ use super::skiplist::SkipListBuilder; use super::StoreReader; use crate::common::CountingWriter; use crate::common::{BinarySerializable, VInt}; +use crate::directory::TerminatingWrite; use crate::directory::WritePtr; use crate::schema::Document; use crate::DocId; @@ -109,6 +110,6 @@ impl StoreWriter { self.offset_index_writer.write(&mut self.writer)?; header_offset.serialize(&mut self.writer)?; self.doc.serialize(&mut self.writer)?; - self.writer.flush() + self.writer.terminate() } } diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs index 807ca7abc..509e3759f 100644 --- a/tests/failpoints/mod.rs +++ b/tests/failpoints/mod.rs @@ -1,7 +1,7 @@ use fail; use std::io::Write; use std::path::Path; -use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory}; +use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite}; use tantivy::doc; use tantivy::schema::{Schema, TEXT}; use tantivy::{Index, Term}; @@ -17,7 +17,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() { managed_directory .open_write(test_path) .unwrap() - .flush() + .terminate() .unwrap(); assert!(managed_directory.exists(test_path)); // triggering gc and setting the delete operation to fail.