diff --git a/.gitignore b/.gitignore index a6ef9ef50..428b92f40 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ target/release Cargo.lock benchmark .DS_Store -cpp/simdcomp/bitpackingbenchmark \ No newline at end of file +cpp/simdcomp/bitpackingbenchmark +*.bk \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 03daa77ff..4d2999409 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,11 @@ uuid = { version = "0.5", features = ["v4", "serde"] } chan = "0.1" version = "2" crossbeam = "0.2" -futures = "0.1.9" -futures-cpupool = "0.1.2" +futures = "0.1" +futures-cpupool = "0.1" +error-chain = "0.8" +owning_ref = "0.3" +stable_deref_trait = "1.0.0" [target.'cfg(windows)'.dependencies] winapi = "0.2" diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index fe947453d..b259bd335 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -2,7 +2,7 @@ use std::io::Write; use std::io; use common::serialize::BinarySerializable; use std::mem; - +use std::ops::Deref; /// Computes the number of bits that will be used for bitpacking. /// @@ -91,15 +91,18 @@ impl BitPacker { -pub struct BitUnpacker { +pub struct BitUnpacker + where Data: Deref +{ num_bits: usize, mask: u64, - data_ptr: *const u8, - data_len: usize, + data: Data, } -impl BitUnpacker { - pub fn new(data: &[u8], num_bits: usize) -> BitUnpacker { +impl BitUnpacker + where Data: Deref +{ + pub fn new(data: Data, num_bits: usize) -> BitUnpacker { let mask: u64 = if num_bits == 64 { !0u64 } else { @@ -108,8 +111,7 @@ impl BitUnpacker { BitUnpacker { num_bits: num_bits, mask: mask, - data_ptr: data.as_ptr(), - data_len: data.len(), + data: data, } } @@ -117,14 +119,18 @@ impl BitUnpacker { if self.num_bits == 0 { return 0; } - let addr = (idx * self.num_bits) / 8; - let bit_shift = idx * self.num_bits - addr * 8; - let val_unshifted_unmasked: u64; - debug_assert!(addr + 8 <= self.data_len, + let data: &[u8] = &*self.data; + let num_bits = self.num_bits; + let mask = self.mask; + let addr_in_bits = idx * num_bits; + let addr = addr_in_bits >> 3; + let bit_shift = addr_in_bits & 7; + debug_assert!(addr + 8 <= data.len(), "The fast field field should have been padded with 7 bytes."); - val_unshifted_unmasked = unsafe { *(self.data_ptr.offset(addr as isize) as *const u64) }; + let val_unshifted_unmasked: u64 = + unsafe { *(data.as_ptr().offset(addr as isize) as *const u64) }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; - (val_shifted & self.mask) + (val_shifted & mask) } } @@ -160,7 +166,7 @@ mod test { let num_bytes = bitpacker.close(&mut data).unwrap(); assert_eq!(num_bytes, (num_bits * len + 7) / 8 + 7); assert_eq!(data.len(), num_bytes); - let bitunpacker = BitUnpacker::new(&data, num_bits); + let bitunpacker = BitUnpacker::new(data, num_bits); for (i, val) in vals.iter().enumerate() { assert_eq!(bitunpacker.get(i), *val); } diff --git a/src/core/index.rs b/src/core/index.rs index e43cfc7da..01a0abe54 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,5 +1,5 @@ use Result; -use Error; +use error::{ErrorKind, ResultExt}; use serde_json; use schema::Schema; use std::sync::Arc; @@ -29,8 +29,7 @@ const NUM_SEARCHERS: usize = 12; fn load_metas(directory: &Directory) -> Result { let meta_data = directory.atomic_read(&META_FILEPATH)?; let meta_string = String::from_utf8_lossy(&meta_data); - serde_json::from_str(&meta_string) - .map_err(|e| Error::CorruptedFile(META_FILEPATH.clone(), Box::new(e))) + serde_json::from_str(&meta_string).chain_err(|| ErrorKind::CorruptedFile(META_FILEPATH.clone())) } /// Tantivy's Search Index diff --git a/src/directory/error.rs b/src/directory/error.rs index 2bc2b6ffe..d864012ea 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -1,5 +1,51 @@ +use std::error::Error as StdError; use std::path::PathBuf; use std::io; +use std::fmt; + +/// General IO error with an optional path to the offending file. +#[derive(Debug)] +pub struct IOError { + path: Option, + err: io::Error, +} + +impl fmt::Display for IOError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.path { + Some(ref path) => write!(f, "io error occurred on path '{:?}': '{}'", path, self.err), + None => write!(f, "io error occurred: '{}'", self.err), + } + } +} + +impl StdError for IOError { + fn description(&self) -> &str { + "io error occurred" + } + + fn cause(&self) -> Option<&StdError> { + Some(&self.err) + } +} + +impl IOError { + pub(crate) fn with_path(path: PathBuf, err: io::Error) -> Self { + IOError { + path: Some(path), + err: err, + } + } +} + +impl From for IOError { + fn from(err: io::Error) -> IOError { + IOError { + path: None, + err: err, + } + } +} /// Error that may occur when opening a directory #[derive(Debug)] @@ -10,6 +56,29 @@ pub enum OpenDirectoryError { NotADirectory(PathBuf), } +impl fmt::Display for OpenDirectoryError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + OpenDirectoryError::DoesNotExist(ref path) => { + write!(f, "the underlying directory '{:?}' does not exist", path) + } + OpenDirectoryError::NotADirectory(ref path) => { + write!(f, "the path '{:?}' exists but is not a directory", path) + } + } + } +} + +impl StdError for OpenDirectoryError { + fn description(&self) -> &str { + "error occurred while opening a directory" + } + + fn cause(&self) -> Option<&StdError> { + None + } +} + /// Error that may occur when starting to write in a file #[derive(Debug)] pub enum OpenWriteError { @@ -18,15 +87,43 @@ pub enum OpenWriteError { FileAlreadyExists(PathBuf), /// Any kind of IO error that happens when /// writing in the underlying IO device. - IOError(io::Error), + IOError(IOError), } -impl From for OpenWriteError { - fn from(err: io::Error) -> OpenWriteError { +impl From for OpenWriteError { + fn from(err: IOError) -> OpenWriteError { OpenWriteError::IOError(err) } } +impl fmt::Display for OpenWriteError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + OpenWriteError::FileAlreadyExists(ref path) => { + write!(f, "the file '{:?}' already exists", path) + } + OpenWriteError::IOError(ref err) => { + write!(f, + "an io error occurred while opening a file for writing: '{}'", + err) + } + } + } +} + +impl StdError for OpenWriteError { + fn description(&self) -> &str { + "error occurred while opening a file for writing" + } + + fn cause(&self) -> Option<&StdError> { + match *self { + OpenWriteError::FileAlreadyExists(_) => None, + OpenWriteError::IOError(ref err) => Some(err), + } + } +} + /// Error that may occur when accessing a file read #[derive(Debug)] pub enum OpenReadError { @@ -34,9 +131,42 @@ pub enum OpenReadError { FileDoesNotExist(PathBuf), /// Any kind of IO error that happens when /// interacting with the underlying IO device. - IOError(io::Error), + IOError(IOError), } +impl From for OpenReadError { + fn from(err: IOError) -> OpenReadError { + OpenReadError::IOError(err) + } +} + +impl fmt::Display for OpenReadError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + OpenReadError::FileDoesNotExist(ref path) => { + write!(f, "the file '{:?}' does not exist", path) + } + OpenReadError::IOError(ref err) => { + write!(f, + "an io error occurred while opening a file for reading: '{}'", + err) + } + } + } +} + +impl StdError for OpenReadError { + fn description(&self) -> &str { + "error occurred while opening a file for reading" + } + + fn cause(&self) -> Option<&StdError> { + match *self { + OpenReadError::FileDoesNotExist(_) => None, + OpenReadError::IOError(ref err) => Some(err), + } + } +} /// Error that may occur when trying to delete a file #[derive(Debug)] @@ -45,8 +175,44 @@ pub enum DeleteError { FileDoesNotExist(PathBuf), /// Any kind of IO error that happens when /// interacting with the underlying IO device. - IOError(io::Error), + IOError(IOError), /// The file may not be deleted because it is /// protected. FileProtected(PathBuf), } + +impl From for DeleteError { + fn from(err: IOError) -> DeleteError { + DeleteError::IOError(err) + } +} + +impl fmt::Display for DeleteError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + DeleteError::FileDoesNotExist(ref path) => { + write!(f, "the file '{:?}' does not exist", path) + } + DeleteError::FileProtected(ref path) => { + write!(f, "the file '{:?}' is protected and can't be deleted", path) + } + DeleteError::IOError(ref err) => { + write!(f, "an io error occurred while deleting a file: '{}'", err) + } + } + } +} + +impl StdError for DeleteError { + fn description(&self) -> &str { + "error occurred while deleting a file" + } + + fn cause(&self) -> Option<&StdError> { + match *self { + DeleteError::FileDoesNotExist(_) | + DeleteError::FileProtected(_) => None, + DeleteError::IOError(ref err) => Some(err), + } + } +} diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 7b87bddb7..29cbef114 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; use serde_json; -use directory::error::{OpenReadError, DeleteError, OpenWriteError}; +use directory::error::{IOError, OpenReadError, DeleteError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::result; use std::io; @@ -12,8 +12,7 @@ use std::io::Write; use core::MANAGED_FILEPATH; use std::collections::HashMap; use std::fmt; -use Result; -use Error; +use error::{Result, ErrorKind, ResultExt}; /// Wrapper of directories that keeps track of files created by Tantivy. /// @@ -86,7 +85,7 @@ impl ManagedDirectory { let managed_files_json = String::from_utf8_lossy(&data); let managed_files: HashSet = serde_json::from_str(&managed_files_json) - .map_err(|e| Error::CorruptedFile(MANAGED_FILEPATH.clone(), Box::new(e)))?; + .chain_err(|| ErrorKind::CorruptedFile(MANAGED_FILEPATH.clone()))?; Ok(ManagedDirectory { directory: box directory, meta_informations: Arc::new(RwLock::new(MetaInformation { @@ -230,7 +229,8 @@ impl Directory for ManagedDirectory { } fn open_write(&mut self, path: &Path) -> result::Result { - self.register_file_as_managed(path)?; + self.register_file_as_managed(path) + .map_err(|e| IOError::with_path(path.to_owned(), e))?; self.directory.open_write(path) } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index f01c813bd..625bae327 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,7 +1,7 @@ use atomicwrites; use common::make_io_err; use directory::Directory; -use directory::error::{OpenWriteError, OpenReadError, DeleteError, OpenDirectoryError}; +use directory::error::{IOError, OpenWriteError, OpenReadError, DeleteError, OpenDirectoryError}; use directory::ReadOnlySource; use directory::shared_vec_slice::SharedVecSlice; use directory::WritePtr; @@ -24,13 +24,15 @@ use std::sync::Weak; use tempdir::TempDir; fn open_mmap(full_path: &PathBuf) -> result::Result>, OpenReadError> { - let convert_file_error = |err: io::Error| if err.kind() == io::ErrorKind::NotFound { - OpenReadError::FileDoesNotExist(full_path.clone()) - } else { - OpenReadError::IOError(err) - }; - let file = File::open(&full_path).map_err(convert_file_error)?; - let meta_data = file.metadata().map_err(OpenReadError::IOError)?; + let file = File::open(&full_path) + .map_err(|e| if e.kind() == io::ErrorKind::NotFound { + OpenReadError::FileDoesNotExist(full_path.clone()) + } else { + OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e)) + })?; + + let meta_data = file.metadata() + .map_err(|e| IOError::with_path(full_path.to_owned(), e))?; if meta_data.len() == 0 { // if the file size is 0, it will not be possible // to mmap the file, so we return an anonymous mmap_cache @@ -39,7 +41,7 @@ fn open_mmap(full_path: &PathBuf) -> result::Result>, OpenReadE } match Mmap::open(&file, Protection::Read) { Ok(mmap) => Ok(Some(Arc::new(mmap))), - Err(e) => Err(OpenReadError::IOError(e)), + Err(e) => Err(IOError::with_path(full_path.to_owned(), e))?, } } @@ -274,7 +276,7 @@ impl Directory for MmapDirectory { let msg = format!("Failed to acquired write lock \ on mmap cache while reading {:?}", path); - OpenReadError::IOError(make_io_err(msg)) + IOError::with_path(path.to_owned(), make_io_err(msg)) })?; Ok(mmap_cache @@ -295,17 +297,19 @@ impl Directory for MmapDirectory { let mut file = open_res .map_err(|err| if err.kind() == io::ErrorKind::AlreadyExists { - OpenWriteError::FileAlreadyExists(PathBuf::from(path)) + OpenWriteError::FileAlreadyExists(path.to_owned()) } else { - OpenWriteError::IOError(err) + IOError::with_path(path.to_owned(), err).into() })?; // making sure the file is created. - try!(file.flush()); + file.flush() + .map_err(|e| IOError::with_path(path.to_owned(), e))?; // Apparetntly, on some filesystem syncing the parent // directory is required. - try!(self.sync_directory()); + self.sync_directory() + .map_err(|e| IOError::with_path(path.to_owned(), e))?; let writer = SafeFileWriter::new(file); Ok(BufWriter::new(Box::new(writer))) @@ -320,19 +324,22 @@ impl Directory for MmapDirectory { let msg = format!("Failed to acquired write lock \ on mmap cache while deleting {:?}", path); - DeleteError::IOError(make_io_err(msg)) + IOError::with_path(path.to_owned(), make_io_err(msg)) })?; // Removing the entry in the MMap cache. // The munmap will appear on Drop, // when the last reference is gone. mmap_cache.cache.remove(&full_path); match fs::remove_file(&full_path) { - Ok(_) => self.sync_directory().map_err(DeleteError::IOError), + Ok(_) => { + self.sync_directory() + .map_err(|e| IOError::with_path(path.to_owned(), e).into()) + } Err(e) => { if e.kind() == io::ErrorKind::NotFound { Err(DeleteError::FileDoesNotExist(path.to_owned())) } else { - Err(DeleteError::IOError(e)) + Err(IOError::with_path(path.to_owned(), e).into()) } } } @@ -349,14 +356,14 @@ impl Directory for MmapDirectory { match File::open(&full_path) { Ok(mut file) => { file.read_to_end(&mut buffer) - .map_err(OpenReadError::IOError)?; + .map_err(|e| IOError::with_path(path.to_owned(), e))?; Ok(buffer) } Err(e) => { if e.kind() == io::ErrorKind::NotFound { Err(OpenReadError::FileDoesNotExist(path.to_owned())) } else { - Err(OpenReadError::IOError(e)) + Err(IOError::with_path(path.to_owned(), e).into()) } } } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 656eb739d..0f205c6f1 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -6,7 +6,7 @@ use std::result; use std::sync::{Arc, RwLock}; use common::make_io_err; use directory::{Directory, ReadOnlySource}; -use directory::error::{OpenWriteError, OpenReadError, DeleteError}; +use directory::error::{IOError, OpenWriteError, OpenReadError, DeleteError}; use directory::WritePtr; use super::shared_vec_slice::SharedVecSlice; @@ -97,7 +97,7 @@ impl InnerDirectory { directory when trying to read {:?}", path); let io_err = make_io_err(msg); - OpenReadError::IOError(io_err) + OpenReadError::IOError(IOError::with_path(path.to_owned(), io_err)) }) .and_then(|readable_map| { readable_map @@ -115,7 +115,7 @@ impl InnerDirectory { directory when trying to delete {:?}", path); let io_err = make_io_err(msg); - DeleteError::IOError(io_err) + DeleteError::IOError(IOError::with_path(path.to_owned(), io_err)) }) .and_then(|mut writable_map| match writable_map.remove(path) { Some(_) => Ok(()), @@ -163,8 +163,13 @@ impl Directory for RAMDirectory { fn open_write(&mut self, path: &Path) -> Result { let path_buf = PathBuf::from(path); let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone()); + + let exists = self.fs + .write(path_buf.clone(), &Vec::new()) + .map_err(|err| IOError::with_path(path.to_owned(), err))?; + // force the creation of the file to mimic the MMap directory. - if try!(self.fs.write(path_buf.clone(), &Vec::new())) { + if exists { Err(OpenWriteError::FileAlreadyExists(path_buf)) } else { Ok(BufWriter::new(Box::new(vec_writer))) diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 478936c75..d327f5a51 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -2,7 +2,7 @@ use fst::raw::MmapReadOnly; use std::ops::Deref; use super::shared_vec_slice::SharedVecSlice; use common::HasLen; - +use stable_deref_trait::StableDeref; /// Read object that represents files in tantivy. /// @@ -17,6 +17,8 @@ pub enum ReadOnlySource { Anonymous(SharedVecSlice), } +unsafe impl StableDeref for ReadOnlySource {} + impl Deref for ReadOnlySource { type Target = [u8]; diff --git a/src/error.rs b/src/error.rs index 83077c633..d6ce4a33d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,89 +1,122 @@ -/// Definition of Tantivy's error and result. +//! Definition of Tantivy's error and result. use std::io; use std::path::PathBuf; -use std::error; use std::sync::PoisonError; -use directory::error::{OpenReadError, OpenWriteError, OpenDirectoryError}; +use directory::error::{IOError, OpenReadError, OpenWriteError, OpenDirectoryError}; use query; use schema; use fastfield::FastFieldNotAvailableError; use serde_json; - -/// Generic tantivy error. -/// -/// Any specialized error return in tantivy can be converted in `tantivy::Error`. -#[derive(Debug)] -pub enum Error { - /// Path does not exist. - PathDoesNotExist(PathBuf), - /// File already exists, this is a problem when we try to write into a new file. - FileAlreadyExists(PathBuf), - /// IO Error - IOError(io::Error), - /// A thread holding the locked panicked and poisoned the lock. - Poisoned, - /// The data within is corrupted. - /// - /// For instance, it contains invalid JSON. - CorruptedFile(PathBuf, Box), - /// Invalid argument was passed by the user. - InvalidArgument(String), - /// An Error happened in one of the thread - ErrorInThread(String), - /// An Error appeared related to the lack of a field. - SchemaError(String), - /// Tried to access a fastfield reader for a field not configured accordingly. - FastFieldError(FastFieldNotAvailableError), -} +error_chain!( + errors { + /// Path does not exist. + PathDoesNotExist(buf: PathBuf) { + description("path does not exist") + display("path does not exist: '{:?}'", buf) + } + /// File already exists, this is a problem when we try to write into a new file. + FileAlreadyExists(buf: PathBuf) { + description("file already exists") + display("file already exists: '{:?}'", buf) + } + /// IO Error. + IOError(err: IOError) { + description("an IO error occurred") + display("an IO error occurred: '{}'", err) + } + /// The data within is corrupted. + /// + /// For instance, it contains invalid JSON. + CorruptedFile(buf: PathBuf) { + description("file contains corrupted data") + display("file contains corrupted data: '{:?}'", buf) + } + /// A thread holding the locked panicked and poisoned the lock. + Poisoned { + description("a thread holding the locked panicked and poisoned the lock") + } + /// Invalid argument was passed by the user. + InvalidArgument(arg: String) { + description("an invalid argument was passed") + display("an invalid argument was passed: '{}'", arg) + } + /// An Error happened in one of the thread. + ErrorInThread(err: String) { + description("an error occurred in a thread") + display("an error occurred in a thread: '{}'", err) + } + /// An Error appeared related to the lack of a field. + SchemaError(field: String) { + description("a schema field is missing") + display("a schema field is missing: '{}'", field) + } + /// Tried to access a fastfield reader for a field not configured accordingly. + FastFieldError(err: FastFieldNotAvailableError) { + description("fast field not available") + display("fast field not available: '{:?}'", err) + } + } +); impl From for Error { fn from(fastfield_error: FastFieldNotAvailableError) -> Error { - Error::FastFieldError(fastfield_error) + ErrorKind::FastFieldError(fastfield_error).into() + } +} + +impl From for Error { + fn from(io_error: IOError) -> Error { + ErrorKind::IOError(io_error).into() } } impl From for Error { fn from(io_error: io::Error) -> Error { - Error::IOError(io_error) + ErrorKind::IOError(io_error.into()).into() } } impl From for Error { fn from(parsing_error: query::QueryParserError) -> Error { - Error::InvalidArgument(format!("Query is invalid. {:?}", parsing_error)) + ErrorKind::InvalidArgument(format!("Query is invalid. {:?}", parsing_error)).into() } } impl From> for Error { fn from(_: PoisonError) -> Error { - Error::Poisoned + ErrorKind::Poisoned.into() } } impl From for Error { fn from(error: OpenReadError) -> Error { match error { - OpenReadError::FileDoesNotExist(filepath) => Error::PathDoesNotExist(filepath), - OpenReadError::IOError(io_error) => Error::IOError(io_error), + OpenReadError::FileDoesNotExist(filepath) => { + ErrorKind::PathDoesNotExist(filepath).into() + } + OpenReadError::IOError(io_error) => ErrorKind::IOError(io_error).into(), } } } impl From for Error { fn from(error: schema::DocParsingError) -> Error { - Error::InvalidArgument(format!("Failed to parse document {:?}", error)) + ErrorKind::InvalidArgument(format!("Failed to parse document {:?}", error)).into() } } impl From for Error { fn from(error: OpenWriteError) -> Error { match error { - OpenWriteError::FileAlreadyExists(filepath) => Error::FileAlreadyExists(filepath), - OpenWriteError::IOError(io_error) => Error::IOError(io_error), - } + OpenWriteError::FileAlreadyExists(filepath) => { + ErrorKind::FileAlreadyExists(filepath) + } + OpenWriteError::IOError(io_error) => ErrorKind::IOError(io_error), + } + .into() } } @@ -91,10 +124,11 @@ impl From for Error { fn from(error: OpenDirectoryError) -> Error { match error { OpenDirectoryError::DoesNotExist(directory_path) => { - Error::PathDoesNotExist(directory_path) + ErrorKind::PathDoesNotExist(directory_path).into() } OpenDirectoryError::NotADirectory(directory_path) => { - Error::InvalidArgument(format!("{:?} is not a directory", directory_path)) + ErrorKind::InvalidArgument(format!("{:?} is not a directory", directory_path)) + .into() } } } @@ -102,6 +136,7 @@ impl From for Error { impl From for Error { fn from(error: serde_json::Error) -> Error { - Error::IOError(error.into()) + let io_err = io::Error::from(error); + ErrorKind::IOError(io_err.into()).into() } } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 54488e9ed..ee1fc1cd4 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -13,6 +13,7 @@ use common::bitpacker::compute_num_bits; use common::bitpacker::BitUnpacker; use schema::FieldType; use common; +use owning_ref::OwningRef; /// Trait for accessing a fastfield. /// @@ -39,8 +40,7 @@ pub trait FastFieldReader: Sized { /// `FastFieldReader` for unsigned 64-bits integers. pub struct U64FastFieldReader { - _data: ReadOnlySource, - bit_unpacker: BitUnpacker, + bit_unpacker: BitUnpacker>, min_value: u64, max_value: u64, } @@ -85,25 +85,23 @@ impl FastFieldReader for U64FastFieldReader { /// Panics if the data is corrupted. fn open(data: ReadOnlySource) -> U64FastFieldReader { let min_value: u64; - let max_value: u64; - let bit_unpacker: BitUnpacker; - + let amplitude: u64; { - let mut cursor: &[u8] = data.as_slice(); + let mut cursor = data.as_slice(); min_value = u64::deserialize(&mut cursor) .expect("Failed to read the min_value of fast field."); - let amplitude = u64::deserialize(&mut cursor) + amplitude = u64::deserialize(&mut cursor) .expect("Failed to read the amplitude of fast field."); - max_value = min_value + amplitude; - let num_bits = compute_num_bits(amplitude); - bit_unpacker = BitUnpacker::new(cursor, num_bits as usize) - } + } + let max_value = min_value + amplitude; + let num_bits = compute_num_bits(amplitude); + let owning_ref = OwningRef::new(data).map(|data| &data[16..]); + let bit_unpacker = BitUnpacker::new(owning_ref, num_bits as usize); U64FastFieldReader { - _data: data, - bit_unpacker: bit_unpacker, min_value: min_value, max_value: max_value, + bit_unpacker: bit_unpacker, } } } @@ -161,6 +159,12 @@ impl I64FastFieldReader { impl FastFieldReader for I64FastFieldReader { type ValueType = i64; + /// + /// + /// # Panics + /// + /// May panic or return wrong random result if `doc` + /// is greater or equal to the segment's `maxdoc`. fn get(&self, doc: DocId) -> i64 { common::u64_to_i64(self.underlying.get(doc)) } @@ -244,8 +248,3 @@ impl FastFieldsReader { }) } } - -unsafe impl Send for U64FastFieldReader {} -unsafe impl Sync for U64FastFieldReader {} -unsafe impl Send for I64FastFieldReader {} -unsafe impl Sync for I64FastFieldReader {} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index eb8117a03..aa577f317 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -9,7 +9,7 @@ use core::SegmentReader; use indexer::stamper::Stamper; use datastruct::stacker::Heap; use directory::FileProtection; -use Error; +use error::{Error, ErrorKind, Result, ResultExt}; use Directory; use fastfield::write_delete_bitset; use indexer::delete_queue::{DeleteCursor, DeleteQueue}; @@ -22,7 +22,6 @@ use indexer::SegmentEntry; use indexer::SegmentWriter; use postings::DocSet; use postings::SegmentPostingsOption; -use Result; use schema::Document; use schema::Schema; use schema::Term; @@ -325,19 +324,17 @@ impl IndexWriter { let former_workers_handles = mem::replace(&mut self.workers_join_handle, vec![]); for join_handle in former_workers_handles { - try!(join_handle - .join() - .expect("Indexing Worker thread panicked") - .map_err(|e| { - Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e)) - })); + join_handle + .join() + .expect("Indexing Worker thread panicked") + .chain_err(|| ErrorKind::ErrorInThread("Error in indexing worker thread.".into()))?; } drop(self.workers_join_handle); let result = self.segment_updater .wait_merging_thread() - .map_err(|_| Error::ErrorInThread("Failed to join merging thread.".to_string())); + .chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into())); if let Err(ref e) = result { error!("Some merging thread failed {:?}", e); @@ -527,12 +524,13 @@ impl IndexWriter { for worker_handle in former_workers_join_handle { let indexing_worker_result = - try!(worker_handle - .join() - .map_err(|e| Error::ErrorInThread(format!("{:?}", e)))); - try!(indexing_worker_result); + worker_handle + .join() + .map_err(|e| Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e))))?; + + indexing_worker_result?; // add a new worker for the next generation. - try!(self.add_indexing_worker()); + self.add_indexing_worker()?; } @@ -603,7 +601,7 @@ mod tests { use schema::{self, Document}; use Index; use Term; - use Error; + use error::*; use env_logger; #[test] @@ -612,7 +610,7 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); let _index_writer = index.writer(40_000_000).unwrap(); match index.writer(40_000_000) { - Err(Error::FileAlreadyExists(_)) => {} + Err(Error(ErrorKind::FileAlreadyExists(_), _)) => {} _ => panic!("Expected FileAlreadyExists error"), } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 4c192549c..b9a536c8d 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,4 @@ -use {Error, Result}; +use error::{ErrorKind, Result}; use core::SegmentReader; use core::Segment; use DocId; @@ -161,7 +161,7 @@ impl IndexMerger { let error_msg = format!("Failed to find a u64_reader for field {:?}", field); error!("{}", error_msg); - return Err(Error::SchemaError(error_msg)); + bail!(ErrorKind::SchemaError(error_msg)); } } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 544613228..cc04a77f7 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,7 +7,7 @@ use core::SegmentMeta; use core::SerializableSegment; use directory::Directory; use indexer::stamper::Stamper; -use Error; +use error::{Error, ErrorKind, Result}; use futures_cpupool::CpuPool; use futures::Future; use futures::Canceled; @@ -19,7 +19,6 @@ use indexer::MergeCandidate; use indexer::merger::IndexMerger; use indexer::SegmentEntry; use indexer::SegmentSerializer; -use Result; use futures_cpupool::CpuFuture; use serde_json; use indexer::delete_queue::DeleteCursor; @@ -117,7 +116,7 @@ fn perform_merge(segment_ids: &[SegmentId], error!("Error, had to abort merge as some of the segment is not managed anymore."); let msg = format!("Segment {:?} requested for merge is not managed.", segment_id); - return Err(Error::InvalidArgument(msg)); + bail!(ErrorKind::InvalidArgument(msg)); } } @@ -448,7 +447,7 @@ impl SegmentUpdater { merging_thread_handle .join() .map(|_| ()) - .map_err(|_| Error::ErrorInThread("Merging thread failed.".to_string()))? + .map_err(|_| ErrorKind::ErrorInThread("Merging thread failed.".into()))?; } // Our merging thread may have queued their completed self.run_async(move |_| {}).wait()?; diff --git a/src/lib.rs b/src/lib.rs index 899532617..5d13c8299 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,9 @@ extern crate serde_derive; #[macro_use] extern crate log; +#[macro_use] +extern crate error_chain; + #[macro_use] extern crate version; extern crate fst; @@ -58,6 +61,8 @@ extern crate crossbeam; extern crate bit_set; extern crate futures; extern crate futures_cpupool; +extern crate owning_ref; +extern crate stable_deref_trait; #[cfg(test)] extern crate env_logger; @@ -83,7 +88,7 @@ mod functional_test; #[macro_use] mod macros; -pub use error::Error; +pub use error::{Error, ErrorKind, ResultExt}; /// Tantivy result. pub type Result = std::result::Result; diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 36f489290..d5e72fa68 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -186,7 +186,8 @@ impl PostingsSerializer { // On the other hand, positions are entirely buffered until the // end of the term, at which point they are compressed and written. if self.text_indexing_options.is_position_enabled() { - self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64) + self.written_bytes_positions += + try!(VInt(self.position_deltas.len() as u64) .serialize(&mut self.positions_write)); let positions_encoded: &[u8] = self.positions_encoder .compress_unsorted(&self.position_deltas[..]); diff --git a/src/query/boolean_query/boolean_query.rs b/src/query/boolean_query/boolean_query.rs index caa47dba1..b471da320 100644 --- a/src/query/boolean_query/boolean_query.rs +++ b/src/query/boolean_query/boolean_query.rs @@ -37,8 +37,7 @@ impl Query for BooleanQuery { } fn weight(&self, searcher: &Searcher) -> Result> { - let sub_weights = - try!(self.subqueries + let sub_weights = try!(self.subqueries .iter() .map(|&(ref _occur, ref subquery)| subquery.weight(searcher)) .collect()); diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index cb3f2f4f6..04f22595c 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -22,7 +22,8 @@ impl BooleanWeight { impl Weight for BooleanWeight { fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result> { - let sub_scorers: Vec> = try!(self.weights + let sub_scorers: Vec> = + try!(self.weights .iter() .map(|weight| weight.scorer(reader)) .collect()); diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 341fa0633..6967be9c7 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -48,6 +48,7 @@ impl<'a, V> Ord for HeapItem<'a, V> /// - the term /// - a slice with the ordinal of the segments containing /// the terms. +#[allow(should_implement_trait)] pub struct TermMerger<'a, V> where V: 'a + BinarySerializable + Default {