Merge branch 'master' into exp/hash_intable

This commit is contained in:
Paul Masurel
2017-05-30 08:27:33 +09:00
19 changed files with 370 additions and 143 deletions

3
.gitignore vendored
View File

@@ -5,4 +5,5 @@ target/release
Cargo.lock
benchmark
.DS_Store
cpp/simdcomp/bitpackingbenchmark
cpp/simdcomp/bitpackingbenchmark
*.bk

View File

@@ -37,8 +37,11 @@ uuid = { version = "0.5", features = ["v4", "serde"] }
chan = "0.1"
version = "2"
crossbeam = "0.2"
futures = "0.1.9"
futures-cpupool = "0.1.2"
futures = "0.1"
futures-cpupool = "0.1"
error-chain = "0.8"
owning_ref = "0.3"
stable_deref_trait = "1.0.0"
[target.'cfg(windows)'.dependencies]
winapi = "0.2"

View File

@@ -2,7 +2,7 @@ use std::io::Write;
use std::io;
use common::serialize::BinarySerializable;
use std::mem;
use std::ops::Deref;
/// Computes the number of bits that will be used for bitpacking.
///
@@ -91,15 +91,18 @@ impl BitPacker {
pub struct BitUnpacker {
pub struct BitUnpacker<Data>
where Data: Deref<Target = [u8]>
{
num_bits: usize,
mask: u64,
data_ptr: *const u8,
data_len: usize,
data: Data,
}
impl BitUnpacker {
pub fn new(data: &[u8], num_bits: usize) -> BitUnpacker {
impl<Data> BitUnpacker<Data>
where Data: Deref<Target = [u8]>
{
pub fn new(data: Data, num_bits: usize) -> BitUnpacker<Data> {
let mask: u64 = if num_bits == 64 {
!0u64
} else {
@@ -108,8 +111,7 @@ impl BitUnpacker {
BitUnpacker {
num_bits: num_bits,
mask: mask,
data_ptr: data.as_ptr(),
data_len: data.len(),
data: data,
}
}
@@ -117,14 +119,18 @@ impl BitUnpacker {
if self.num_bits == 0 {
return 0;
}
let addr = (idx * self.num_bits) / 8;
let bit_shift = idx * self.num_bits - addr * 8;
let val_unshifted_unmasked: u64;
debug_assert!(addr + 8 <= self.data_len,
let data: &[u8] = &*self.data;
let num_bits = self.num_bits;
let mask = self.mask;
let addr_in_bits = idx * num_bits;
let addr = addr_in_bits >> 3;
let bit_shift = addr_in_bits & 7;
debug_assert!(addr + 8 <= data.len(),
"The fast field field should have been padded with 7 bytes.");
val_unshifted_unmasked = unsafe { *(self.data_ptr.offset(addr as isize) as *const u64) };
let val_unshifted_unmasked: u64 =
unsafe { *(data.as_ptr().offset(addr as isize) as *const u64) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
(val_shifted & self.mask)
(val_shifted & mask)
}
}
@@ -160,7 +166,7 @@ mod test {
let num_bytes = bitpacker.close(&mut data).unwrap();
assert_eq!(num_bytes, (num_bits * len + 7) / 8 + 7);
assert_eq!(data.len(), num_bytes);
let bitunpacker = BitUnpacker::new(&data, num_bits);
let bitunpacker = BitUnpacker::new(data, num_bits);
for (i, val) in vals.iter().enumerate() {
assert_eq!(bitunpacker.get(i), *val);
}

View File

@@ -1,5 +1,5 @@
use Result;
use Error;
use error::{ErrorKind, ResultExt};
use serde_json;
use schema::Schema;
use std::sync::Arc;
@@ -29,8 +29,7 @@ const NUM_SEARCHERS: usize = 12;
fn load_metas(directory: &Directory) -> Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8_lossy(&meta_data);
serde_json::from_str(&meta_string)
.map_err(|e| Error::CorruptedFile(META_FILEPATH.clone(), Box::new(e)))
serde_json::from_str(&meta_string).chain_err(|| ErrorKind::CorruptedFile(META_FILEPATH.clone()))
}
/// Tantivy's Search Index

View File

@@ -1,5 +1,51 @@
use std::error::Error as StdError;
use std::path::PathBuf;
use std::io;
use std::fmt;
/// General IO error with an optional path to the offending file.
#[derive(Debug)]
pub struct IOError {
path: Option<PathBuf>,
err: io::Error,
}
impl fmt::Display for IOError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.path {
Some(ref path) => write!(f, "io error occurred on path '{:?}': '{}'", path, self.err),
None => write!(f, "io error occurred: '{}'", self.err),
}
}
}
impl StdError for IOError {
fn description(&self) -> &str {
"io error occurred"
}
fn cause(&self) -> Option<&StdError> {
Some(&self.err)
}
}
impl IOError {
pub(crate) fn with_path(path: PathBuf, err: io::Error) -> Self {
IOError {
path: Some(path),
err: err,
}
}
}
impl From<io::Error> for IOError {
fn from(err: io::Error) -> IOError {
IOError {
path: None,
err: err,
}
}
}
/// Error that may occur when opening a directory
#[derive(Debug)]
@@ -10,6 +56,29 @@ pub enum OpenDirectoryError {
NotADirectory(PathBuf),
}
impl fmt::Display for OpenDirectoryError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
OpenDirectoryError::DoesNotExist(ref path) => {
write!(f, "the underlying directory '{:?}' does not exist", path)
}
OpenDirectoryError::NotADirectory(ref path) => {
write!(f, "the path '{:?}' exists but is not a directory", path)
}
}
}
}
impl StdError for OpenDirectoryError {
fn description(&self) -> &str {
"error occurred while opening a directory"
}
fn cause(&self) -> Option<&StdError> {
None
}
}
/// Error that may occur when starting to write in a file
#[derive(Debug)]
pub enum OpenWriteError {
@@ -18,15 +87,43 @@ pub enum OpenWriteError {
FileAlreadyExists(PathBuf),
/// Any kind of IO error that happens when
/// writing in the underlying IO device.
IOError(io::Error),
IOError(IOError),
}
impl From<io::Error> for OpenWriteError {
fn from(err: io::Error) -> OpenWriteError {
impl From<IOError> for OpenWriteError {
fn from(err: IOError) -> OpenWriteError {
OpenWriteError::IOError(err)
}
}
impl fmt::Display for OpenWriteError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
OpenWriteError::FileAlreadyExists(ref path) => {
write!(f, "the file '{:?}' already exists", path)
}
OpenWriteError::IOError(ref err) => {
write!(f,
"an io error occurred while opening a file for writing: '{}'",
err)
}
}
}
}
impl StdError for OpenWriteError {
fn description(&self) -> &str {
"error occurred while opening a file for writing"
}
fn cause(&self) -> Option<&StdError> {
match *self {
OpenWriteError::FileAlreadyExists(_) => None,
OpenWriteError::IOError(ref err) => Some(err),
}
}
}
/// Error that may occur when accessing a file read
#[derive(Debug)]
pub enum OpenReadError {
@@ -34,9 +131,42 @@ pub enum OpenReadError {
FileDoesNotExist(PathBuf),
/// Any kind of IO error that happens when
/// interacting with the underlying IO device.
IOError(io::Error),
IOError(IOError),
}
impl From<IOError> for OpenReadError {
fn from(err: IOError) -> OpenReadError {
OpenReadError::IOError(err)
}
}
impl fmt::Display for OpenReadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
OpenReadError::FileDoesNotExist(ref path) => {
write!(f, "the file '{:?}' does not exist", path)
}
OpenReadError::IOError(ref err) => {
write!(f,
"an io error occurred while opening a file for reading: '{}'",
err)
}
}
}
}
impl StdError for OpenReadError {
fn description(&self) -> &str {
"error occurred while opening a file for reading"
}
fn cause(&self) -> Option<&StdError> {
match *self {
OpenReadError::FileDoesNotExist(_) => None,
OpenReadError::IOError(ref err) => Some(err),
}
}
}
/// Error that may occur when trying to delete a file
#[derive(Debug)]
@@ -45,8 +175,44 @@ pub enum DeleteError {
FileDoesNotExist(PathBuf),
/// Any kind of IO error that happens when
/// interacting with the underlying IO device.
IOError(io::Error),
IOError(IOError),
/// The file may not be deleted because it is
/// protected.
FileProtected(PathBuf),
}
impl From<IOError> for DeleteError {
fn from(err: IOError) -> DeleteError {
DeleteError::IOError(err)
}
}
impl fmt::Display for DeleteError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DeleteError::FileDoesNotExist(ref path) => {
write!(f, "the file '{:?}' does not exist", path)
}
DeleteError::FileProtected(ref path) => {
write!(f, "the file '{:?}' is protected and can't be deleted", path)
}
DeleteError::IOError(ref err) => {
write!(f, "an io error occurred while deleting a file: '{}'", err)
}
}
}
}
impl StdError for DeleteError {
fn description(&self) -> &str {
"error occurred while deleting a file"
}
fn cause(&self) -> Option<&StdError> {
match *self {
DeleteError::FileDoesNotExist(_) |
DeleteError::FileProtected(_) => None,
DeleteError::IOError(ref err) => Some(err),
}
}
}

View File

@@ -1,6 +1,6 @@
use std::path::{Path, PathBuf};
use serde_json;
use directory::error::{OpenReadError, DeleteError, OpenWriteError};
use directory::error::{IOError, OpenReadError, DeleteError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use std::result;
use std::io;
@@ -12,8 +12,7 @@ use std::io::Write;
use core::MANAGED_FILEPATH;
use std::collections::HashMap;
use std::fmt;
use Result;
use Error;
use error::{Result, ErrorKind, ResultExt};
/// Wrapper of directories that keeps track of files created by Tantivy.
///
@@ -86,7 +85,7 @@ impl ManagedDirectory {
let managed_files_json = String::from_utf8_lossy(&data);
let managed_files: HashSet<PathBuf> =
serde_json::from_str(&managed_files_json)
.map_err(|e| Error::CorruptedFile(MANAGED_FILEPATH.clone(), Box::new(e)))?;
.chain_err(|| ErrorKind::CorruptedFile(MANAGED_FILEPATH.clone()))?;
Ok(ManagedDirectory {
directory: box directory,
meta_informations: Arc::new(RwLock::new(MetaInformation {
@@ -230,7 +229,8 @@ impl Directory for ManagedDirectory {
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.register_file_as_managed(path)?;
self.register_file_as_managed(path)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
self.directory.open_write(path)
}

View File

@@ -1,7 +1,7 @@
use atomicwrites;
use common::make_io_err;
use directory::Directory;
use directory::error::{OpenWriteError, OpenReadError, DeleteError, OpenDirectoryError};
use directory::error::{IOError, OpenWriteError, OpenReadError, DeleteError, OpenDirectoryError};
use directory::ReadOnlySource;
use directory::shared_vec_slice::SharedVecSlice;
use directory::WritePtr;
@@ -24,13 +24,15 @@ use std::sync::Weak;
use tempdir::TempDir;
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<Arc<Mmap>>, OpenReadError> {
let convert_file_error = |err: io::Error| if err.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.clone())
} else {
OpenReadError::IOError(err)
};
let file = File::open(&full_path).map_err(convert_file_error)?;
let meta_data = file.metadata().map_err(OpenReadError::IOError)?;
let file = File::open(&full_path)
.map_err(|e| if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.clone())
} else {
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
})?;
let meta_data = file.metadata()
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
if meta_data.len() == 0 {
// if the file size is 0, it will not be possible
// to mmap the file, so we return an anonymous mmap_cache
@@ -39,7 +41,7 @@ fn open_mmap(full_path: &PathBuf) -> result::Result<Option<Arc<Mmap>>, OpenReadE
}
match Mmap::open(&file, Protection::Read) {
Ok(mmap) => Ok(Some(Arc::new(mmap))),
Err(e) => Err(OpenReadError::IOError(e)),
Err(e) => Err(IOError::with_path(full_path.to_owned(), e))?,
}
}
@@ -274,7 +276,7 @@ impl Directory for MmapDirectory {
let msg = format!("Failed to acquired write lock \
on mmap cache while reading {:?}",
path);
OpenReadError::IOError(make_io_err(msg))
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
@@ -295,17 +297,19 @@ impl Directory for MmapDirectory {
let mut file = open_res
.map_err(|err| if err.kind() == io::ErrorKind::AlreadyExists {
OpenWriteError::FileAlreadyExists(PathBuf::from(path))
OpenWriteError::FileAlreadyExists(path.to_owned())
} else {
OpenWriteError::IOError(err)
IOError::with_path(path.to_owned(), err).into()
})?;
// making sure the file is created.
try!(file.flush());
file.flush()
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
// Apparetntly, on some filesystem syncing the parent
// directory is required.
try!(self.sync_directory());
self.sync_directory()
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
let writer = SafeFileWriter::new(file);
Ok(BufWriter::new(Box::new(writer)))
@@ -320,19 +324,22 @@ impl Directory for MmapDirectory {
let msg = format!("Failed to acquired write lock \
on mmap cache while deleting {:?}",
path);
DeleteError::IOError(make_io_err(msg))
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self.sync_directory().map_err(DeleteError::IOError),
Ok(_) => {
self.sync_directory()
.map_err(|e| IOError::with_path(path.to_owned(), e).into())
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(DeleteError::FileDoesNotExist(path.to_owned()))
} else {
Err(DeleteError::IOError(e))
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}
@@ -349,14 +356,14 @@ impl Directory for MmapDirectory {
match File::open(&full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer)
.map_err(OpenReadError::IOError)?;
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(buffer)
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(OpenReadError::IOError(e))
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}

View File

@@ -6,7 +6,7 @@ use std::result;
use std::sync::{Arc, RwLock};
use common::make_io_err;
use directory::{Directory, ReadOnlySource};
use directory::error::{OpenWriteError, OpenReadError, DeleteError};
use directory::error::{IOError, OpenWriteError, OpenReadError, DeleteError};
use directory::WritePtr;
use super::shared_vec_slice::SharedVecSlice;
@@ -97,7 +97,7 @@ impl InnerDirectory {
directory when trying to read {:?}",
path);
let io_err = make_io_err(msg);
OpenReadError::IOError(io_err)
OpenReadError::IOError(IOError::with_path(path.to_owned(), io_err))
})
.and_then(|readable_map| {
readable_map
@@ -115,7 +115,7 @@ impl InnerDirectory {
directory when trying to delete {:?}",
path);
let io_err = make_io_err(msg);
DeleteError::IOError(io_err)
DeleteError::IOError(IOError::with_path(path.to_owned(), io_err))
})
.and_then(|mut writable_map| match writable_map.remove(path) {
Some(_) => Ok(()),
@@ -163,8 +163,13 @@ impl Directory for RAMDirectory {
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let path_buf = PathBuf::from(path);
let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
let exists = self.fs
.write(path_buf.clone(), &Vec::new())
.map_err(|err| IOError::with_path(path.to_owned(), err))?;
// force the creation of the file to mimic the MMap directory.
if try!(self.fs.write(path_buf.clone(), &Vec::new())) {
if exists {
Err(OpenWriteError::FileAlreadyExists(path_buf))
} else {
Ok(BufWriter::new(Box::new(vec_writer)))

View File

@@ -2,7 +2,7 @@ use fst::raw::MmapReadOnly;
use std::ops::Deref;
use super::shared_vec_slice::SharedVecSlice;
use common::HasLen;
use stable_deref_trait::StableDeref;
/// Read object that represents files in tantivy.
///
@@ -17,6 +17,8 @@ pub enum ReadOnlySource {
Anonymous(SharedVecSlice),
}
unsafe impl StableDeref for ReadOnlySource {}
impl Deref for ReadOnlySource {
type Target = [u8];

View File

@@ -1,89 +1,122 @@
/// Definition of Tantivy's error and result.
//! Definition of Tantivy's error and result.
use std::io;
use std::path::PathBuf;
use std::error;
use std::sync::PoisonError;
use directory::error::{OpenReadError, OpenWriteError, OpenDirectoryError};
use directory::error::{IOError, OpenReadError, OpenWriteError, OpenDirectoryError};
use query;
use schema;
use fastfield::FastFieldNotAvailableError;
use serde_json;
/// Generic tantivy error.
///
/// Any specialized error return in tantivy can be converted in `tantivy::Error`.
#[derive(Debug)]
pub enum Error {
/// Path does not exist.
PathDoesNotExist(PathBuf),
/// File already exists, this is a problem when we try to write into a new file.
FileAlreadyExists(PathBuf),
/// IO Error
IOError(io::Error),
/// A thread holding the locked panicked and poisoned the lock.
Poisoned,
/// The data within is corrupted.
///
/// For instance, it contains invalid JSON.
CorruptedFile(PathBuf, Box<error::Error + Send + Sync>),
/// Invalid argument was passed by the user.
InvalidArgument(String),
/// An Error happened in one of the thread
ErrorInThread(String),
/// An Error appeared related to the lack of a field.
SchemaError(String),
/// Tried to access a fastfield reader for a field not configured accordingly.
FastFieldError(FastFieldNotAvailableError),
}
error_chain!(
errors {
/// Path does not exist.
PathDoesNotExist(buf: PathBuf) {
description("path does not exist")
display("path does not exist: '{:?}'", buf)
}
/// File already exists, this is a problem when we try to write into a new file.
FileAlreadyExists(buf: PathBuf) {
description("file already exists")
display("file already exists: '{:?}'", buf)
}
/// IO Error.
IOError(err: IOError) {
description("an IO error occurred")
display("an IO error occurred: '{}'", err)
}
/// The data within is corrupted.
///
/// For instance, it contains invalid JSON.
CorruptedFile(buf: PathBuf) {
description("file contains corrupted data")
display("file contains corrupted data: '{:?}'", buf)
}
/// A thread holding the locked panicked and poisoned the lock.
Poisoned {
description("a thread holding the locked panicked and poisoned the lock")
}
/// Invalid argument was passed by the user.
InvalidArgument(arg: String) {
description("an invalid argument was passed")
display("an invalid argument was passed: '{}'", arg)
}
/// An Error happened in one of the thread.
ErrorInThread(err: String) {
description("an error occurred in a thread")
display("an error occurred in a thread: '{}'", err)
}
/// An Error appeared related to the lack of a field.
SchemaError(field: String) {
description("a schema field is missing")
display("a schema field is missing: '{}'", field)
}
/// Tried to access a fastfield reader for a field not configured accordingly.
FastFieldError(err: FastFieldNotAvailableError) {
description("fast field not available")
display("fast field not available: '{:?}'", err)
}
}
);
impl From<FastFieldNotAvailableError> for Error {
fn from(fastfield_error: FastFieldNotAvailableError) -> Error {
Error::FastFieldError(fastfield_error)
ErrorKind::FastFieldError(fastfield_error).into()
}
}
impl From<IOError> for Error {
fn from(io_error: IOError) -> Error {
ErrorKind::IOError(io_error).into()
}
}
impl From<io::Error> for Error {
fn from(io_error: io::Error) -> Error {
Error::IOError(io_error)
ErrorKind::IOError(io_error.into()).into()
}
}
impl From<query::QueryParserError> for Error {
fn from(parsing_error: query::QueryParserError) -> Error {
Error::InvalidArgument(format!("Query is invalid. {:?}", parsing_error))
ErrorKind::InvalidArgument(format!("Query is invalid. {:?}", parsing_error)).into()
}
}
impl<Guard> From<PoisonError<Guard>> for Error {
fn from(_: PoisonError<Guard>) -> Error {
Error::Poisoned
ErrorKind::Poisoned.into()
}
}
impl From<OpenReadError> for Error {
fn from(error: OpenReadError) -> Error {
match error {
OpenReadError::FileDoesNotExist(filepath) => Error::PathDoesNotExist(filepath),
OpenReadError::IOError(io_error) => Error::IOError(io_error),
OpenReadError::FileDoesNotExist(filepath) => {
ErrorKind::PathDoesNotExist(filepath).into()
}
OpenReadError::IOError(io_error) => ErrorKind::IOError(io_error).into(),
}
}
}
impl From<schema::DocParsingError> for Error {
fn from(error: schema::DocParsingError) -> Error {
Error::InvalidArgument(format!("Failed to parse document {:?}", error))
ErrorKind::InvalidArgument(format!("Failed to parse document {:?}", error)).into()
}
}
impl From<OpenWriteError> for Error {
fn from(error: OpenWriteError) -> Error {
match error {
OpenWriteError::FileAlreadyExists(filepath) => Error::FileAlreadyExists(filepath),
OpenWriteError::IOError(io_error) => Error::IOError(io_error),
}
OpenWriteError::FileAlreadyExists(filepath) => {
ErrorKind::FileAlreadyExists(filepath)
}
OpenWriteError::IOError(io_error) => ErrorKind::IOError(io_error),
}
.into()
}
}
@@ -91,10 +124,11 @@ impl From<OpenDirectoryError> for Error {
fn from(error: OpenDirectoryError) -> Error {
match error {
OpenDirectoryError::DoesNotExist(directory_path) => {
Error::PathDoesNotExist(directory_path)
ErrorKind::PathDoesNotExist(directory_path).into()
}
OpenDirectoryError::NotADirectory(directory_path) => {
Error::InvalidArgument(format!("{:?} is not a directory", directory_path))
ErrorKind::InvalidArgument(format!("{:?} is not a directory", directory_path))
.into()
}
}
}
@@ -102,6 +136,7 @@ impl From<OpenDirectoryError> for Error {
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Error {
Error::IOError(error.into())
let io_err = io::Error::from(error);
ErrorKind::IOError(io_err.into()).into()
}
}

View File

@@ -13,6 +13,7 @@ use common::bitpacker::compute_num_bits;
use common::bitpacker::BitUnpacker;
use schema::FieldType;
use common;
use owning_ref::OwningRef;
/// Trait for accessing a fastfield.
///
@@ -39,8 +40,7 @@ pub trait FastFieldReader: Sized {
/// `FastFieldReader` for unsigned 64-bits integers.
pub struct U64FastFieldReader {
_data: ReadOnlySource,
bit_unpacker: BitUnpacker,
bit_unpacker: BitUnpacker<OwningRef<ReadOnlySource, [u8]>>,
min_value: u64,
max_value: u64,
}
@@ -85,25 +85,23 @@ impl FastFieldReader for U64FastFieldReader {
/// Panics if the data is corrupted.
fn open(data: ReadOnlySource) -> U64FastFieldReader {
let min_value: u64;
let max_value: u64;
let bit_unpacker: BitUnpacker;
let amplitude: u64;
{
let mut cursor: &[u8] = data.as_slice();
let mut cursor = data.as_slice();
min_value = u64::deserialize(&mut cursor)
.expect("Failed to read the min_value of fast field.");
let amplitude = u64::deserialize(&mut cursor)
amplitude = u64::deserialize(&mut cursor)
.expect("Failed to read the amplitude of fast field.");
max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
bit_unpacker = BitUnpacker::new(cursor, num_bits as usize)
}
}
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
let owning_ref = OwningRef::new(data).map(|data| &data[16..]);
let bit_unpacker = BitUnpacker::new(owning_ref, num_bits as usize);
U64FastFieldReader {
_data: data,
bit_unpacker: bit_unpacker,
min_value: min_value,
max_value: max_value,
bit_unpacker: bit_unpacker,
}
}
}
@@ -161,6 +159,12 @@ impl I64FastFieldReader {
impl FastFieldReader for I64FastFieldReader {
type ValueType = i64;
///
///
/// # Panics
///
/// May panic or return wrong random result if `doc`
/// is greater or equal to the segment's `maxdoc`.
fn get(&self, doc: DocId) -> i64 {
common::u64_to_i64(self.underlying.get(doc))
}
@@ -244,8 +248,3 @@ impl FastFieldsReader {
})
}
}
unsafe impl Send for U64FastFieldReader {}
unsafe impl Sync for U64FastFieldReader {}
unsafe impl Send for I64FastFieldReader {}
unsafe impl Sync for I64FastFieldReader {}

View File

@@ -9,7 +9,7 @@ use core::SegmentReader;
use indexer::stamper::Stamper;
use datastruct::stacker::Heap;
use directory::FileProtection;
use Error;
use error::{Error, ErrorKind, Result, ResultExt};
use Directory;
use fastfield::write_delete_bitset;
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
@@ -22,7 +22,6 @@ use indexer::SegmentEntry;
use indexer::SegmentWriter;
use postings::DocSet;
use postings::SegmentPostingsOption;
use Result;
use schema::Document;
use schema::Schema;
use schema::Term;
@@ -325,19 +324,17 @@ impl IndexWriter {
let former_workers_handles = mem::replace(&mut self.workers_join_handle, vec![]);
for join_handle in former_workers_handles {
try!(join_handle
.join()
.expect("Indexing Worker thread panicked")
.map_err(|e| {
Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e))
}));
join_handle
.join()
.expect("Indexing Worker thread panicked")
.chain_err(|| ErrorKind::ErrorInThread("Error in indexing worker thread.".into()))?;
}
drop(self.workers_join_handle);
let result =
self.segment_updater
.wait_merging_thread()
.map_err(|_| Error::ErrorInThread("Failed to join merging thread.".to_string()));
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
if let Err(ref e) = result {
error!("Some merging thread failed {:?}", e);
@@ -527,12 +524,13 @@ impl IndexWriter {
for worker_handle in former_workers_join_handle {
let indexing_worker_result =
try!(worker_handle
.join()
.map_err(|e| Error::ErrorInThread(format!("{:?}", e))));
try!(indexing_worker_result);
worker_handle
.join()
.map_err(|e| Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e))))?;
indexing_worker_result?;
// add a new worker for the next generation.
try!(self.add_indexing_worker());
self.add_indexing_worker()?;
}
@@ -603,7 +601,7 @@ mod tests {
use schema::{self, Document};
use Index;
use Term;
use Error;
use error::*;
use env_logger;
#[test]
@@ -612,7 +610,7 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
let _index_writer = index.writer(40_000_000).unwrap();
match index.writer(40_000_000) {
Err(Error::FileAlreadyExists(_)) => {}
Err(Error(ErrorKind::FileAlreadyExists(_), _)) => {}
_ => panic!("Expected FileAlreadyExists error"),
}
}

View File

@@ -1,4 +1,4 @@
use {Error, Result};
use error::{ErrorKind, Result};
use core::SegmentReader;
use core::Segment;
use DocId;
@@ -161,7 +161,7 @@ impl IndexMerger {
let error_msg = format!("Failed to find a u64_reader for field {:?}",
field);
error!("{}", error_msg);
return Err(Error::SchemaError(error_msg));
bail!(ErrorKind::SchemaError(error_msg));
}
}
}

View File

@@ -7,7 +7,7 @@ use core::SegmentMeta;
use core::SerializableSegment;
use directory::Directory;
use indexer::stamper::Stamper;
use Error;
use error::{Error, ErrorKind, Result};
use futures_cpupool::CpuPool;
use futures::Future;
use futures::Canceled;
@@ -19,7 +19,6 @@ use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use futures_cpupool::CpuFuture;
use serde_json;
use indexer::delete_queue::DeleteCursor;
@@ -117,7 +116,7 @@ fn perform_merge(segment_ids: &[SegmentId],
error!("Error, had to abort merge as some of the segment is not managed anymore.");
let msg = format!("Segment {:?} requested for merge is not managed.",
segment_id);
return Err(Error::InvalidArgument(msg));
bail!(ErrorKind::InvalidArgument(msg));
}
}
@@ -448,7 +447,7 @@ impl SegmentUpdater {
merging_thread_handle
.join()
.map(|_| ())
.map_err(|_| Error::ErrorInThread("Merging thread failed.".to_string()))?
.map_err(|_| ErrorKind::ErrorInThread("Merging thread failed.".into()))?;
}
// Our merging thread may have queued their completed
self.run_async(move |_| {}).wait()?;

View File

@@ -35,6 +35,9 @@ extern crate serde_derive;
#[macro_use]
extern crate log;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate version;
extern crate fst;
@@ -58,6 +61,8 @@ extern crate crossbeam;
extern crate bit_set;
extern crate futures;
extern crate futures_cpupool;
extern crate owning_ref;
extern crate stable_deref_trait;
#[cfg(test)]
extern crate env_logger;
@@ -83,7 +88,7 @@ mod functional_test;
#[macro_use]
mod macros;
pub use error::Error;
pub use error::{Error, ErrorKind, ResultExt};
/// Tantivy result.
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -186,7 +186,8 @@ impl PostingsSerializer {
// On the other hand, positions are entirely buffered until the
// end of the term, at which point they are compressed and written.
if self.text_indexing_options.is_position_enabled() {
self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64)
self.written_bytes_positions +=
try!(VInt(self.position_deltas.len() as u64)
.serialize(&mut self.positions_write));
let positions_encoded: &[u8] = self.positions_encoder
.compress_unsorted(&self.position_deltas[..]);

View File

@@ -37,8 +37,7 @@ impl Query for BooleanQuery {
}
fn weight(&self, searcher: &Searcher) -> Result<Box<Weight>> {
let sub_weights =
try!(self.subqueries
let sub_weights = try!(self.subqueries
.iter()
.map(|&(ref _occur, ref subquery)| subquery.weight(searcher))
.collect());

View File

@@ -22,7 +22,8 @@ impl BooleanWeight {
impl Weight for BooleanWeight {
fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result<Box<Scorer + 'a>> {
let sub_scorers: Vec<Box<Scorer + 'a>> = try!(self.weights
let sub_scorers: Vec<Box<Scorer + 'a>> =
try!(self.weights
.iter()
.map(|weight| weight.scorer(reader))
.collect());

View File

@@ -48,6 +48,7 @@ impl<'a, V> Ord for HeapItem<'a, V>
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
#[allow(should_implement_trait)]
pub struct TermMerger<'a, V>
where V: 'a + BinarySerializable + Default
{