Change Footer version handling, Make compression dynamic (#1060)

Change Footer version handling, Make compression dynamic

Change Footer version handling
Simplify version handling by switching to JSON instead of binary serialization.
fixes #1058

Make compression dynamic
Instead of choosing the compression during compile time via a feature flag, you can now have multiple compression algorithms enabled and decide during runtime which one to choose via IndexSettings. Changing the compression algorithm on an index is also supported. The information which algorithm was used in the doc store is stored in the DocStoreFooter. The default is the lz4 block format.
fixes #904

Handle merging of different compressors
Fix feature flag names
Add doc store test for all compressors
This commit is contained in:
PSeitz
2021-05-28 07:57:20 +02:00
committed by GitHub
parent 4afba005f9
commit 8d32c3ba3a
26 changed files with 565 additions and 478 deletions

View File

@@ -12,7 +12,13 @@ Tantivy 0.15.0
- Simplified positions index format (@fulmicoton) #1022
- Moved bitpacking to bitpacker subcrate and add BlockedBitpacker, which bitpacks blocks of 128 elements (@PSeitz) #1030
- Added support for more-like-this query in tantivy (@evanxg852000) #1011
- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations). #1026
- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations)(@PSeitz). #1026
- Add iterator over documents in doc store (@PSeitz). #1044
- Fix log merge policy (@PSeitz). #1043
- Add detection to avoid small doc store blocks on merge (@PSeitz). #1054
- Make doc store compression dynamic (@PSeitz). #1060
- Switch to json for footer version handling (@PSeitz). #1060
Tantivy 0.14.0
=========================

View File

@@ -21,7 +21,6 @@ regex ={ version = "1.5.4", default-features = false, features = ["std"] }
tantivy-fst = "0.3"
memmap = {version = "0.7", optional=true}
lz4_flex = { version = "0.8.0", default-features = false, features = ["checked-decode"], optional = true }
lz4 = { version = "1.23.2", optional = true }
brotli = { version = "3.3", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2", optional = true }
@@ -77,12 +76,13 @@ debug-assertions = true
overflow-checks = true
[features]
default = ["mmap", "lz4-block-compression" ]
default = ["mmap", "lz4-compression" ]
mmap = ["fs2", "tempfile", "memmap"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]
lz4-block-compression = ["lz4_flex"]
lz4-compression = ["lz4_flex"]
snappy-compression = ["snap"]
failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
wasm-bindgen = ["uuid/wasm-bindgen"]

View File

@@ -18,5 +18,6 @@ install:
build: false
test_script:
- REM SET RUST_LOG=tantivy,test & cargo test --all --verbose --no-default-features --features lz4-block-compression --features mmap
- REM SET RUST_LOG=tantivy,test & cargo test --all --verbose --no-default-features --features lz4-compression --features mmap
- REM SET RUST_LOG=tantivy,test & cargo test test_store --verbose --no-default-features --features lz4-compression --features snappy-compression --features brotli-compression --features mmap
- REM SET RUST_BACKTRACE=1 & cargo build --examples

View File

@@ -8,7 +8,7 @@ pub use self::bitset::BitSet;
pub(crate) use self::bitset::TinySet;
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::counting_writer::CountingWriter;
pub use self::serialize::{BinarySerializable, FixedSize};
pub use self::serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use self::vint::{
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt,
};

View File

@@ -14,6 +14,20 @@ pub trait BinarySerializable: fmt::Debug + Sized {
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
}
pub trait DeserializeFrom<T: BinarySerializable> {
fn deserialize(&mut self) -> io::Result<T>;
}
/// Implement deserialize from &[u8] for all types which implement BinarySerializable.
///
/// TryFrom would actually be preferrable, but not possible because of the orphan
/// rules (not completely sure if this could be resolved)
impl<T: BinarySerializable> DeserializeFrom<T> for &[u8] {
fn deserialize(&mut self) -> io::Result<T> {
T::deserialize(self)
}
}
/// `FixedSize` marks a `BinarySerializable` as
/// always serializing to the same size.
pub trait FixedSize: BinarySerializable {
@@ -61,6 +75,11 @@ impl<Left: BinarySerializable, Right: BinarySerializable> BinarySerializable for
Ok((Left::deserialize(reader)?, Right::deserialize(reader)?))
}
}
impl<Left: BinarySerializable + FixedSize, Right: BinarySerializable + FixedSize> FixedSize
for (Left, Right)
{
const SIZE_IN_BYTES: usize = Left::SIZE_IN_BYTES + Right::SIZE_IN_BYTES;
}
impl BinarySerializable for u32 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {

View File

@@ -76,7 +76,7 @@ fn load_metas(
/// );
///
/// let schema = schema_builder.build();
/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc})};
/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc}), ..Default::default()};
/// let index = Index::builder().schema(schema).settings(settings).create_in_ram();
///
/// ```
@@ -173,7 +173,7 @@ impl IndexBuilder {
&directory,
)?;
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
metas.index_settings = self.index_settings.clone();
metas.index_settings = self.index_settings;
let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
Ok(index)
}
@@ -460,6 +460,13 @@ impl Index {
pub fn settings(&self) -> &IndexSettings {
&self.settings
}
/// Accessor to the index settings
///
pub fn settings_mut(&mut self) -> &mut IndexSettings {
&mut self.settings
}
/// Accessor to the index schema
///
/// The schema is actually cloned.

View File

@@ -1,7 +1,7 @@
use super::SegmentComponent;
use crate::core::SegmentId;
use crate::schema::Schema;
use crate::Opstamp;
use crate::{core::SegmentId, store::Compressor};
use census::{Inventory, TrackedObject};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
@@ -233,7 +233,11 @@ impl InnerSegmentMeta {
pub struct IndexSettings {
/// Sorts the documents by information
/// provided in `IndexSortByField`
#[serde(skip_serializing_if = "Option::is_none")]
pub sort_by_field: Option<IndexSortByField>,
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
}
/// Settings to presort the documents in an index
///
@@ -380,6 +384,7 @@ mod tests {
field: "text".to_string(),
order: Order::Asc,
}),
..Default::default()
},
segments: Vec::new(),
schema,
@@ -389,7 +394,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"}},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#
);
}
}

View File

@@ -147,6 +147,13 @@ impl FileSlice {
self.slice(from_offset..self.len())
}
/// Returns a slice from the end.
///
/// Equivalent to `.slice(self.len() - from_offset, self.len())`
pub fn slice_from_end(&self, from_offset: usize) -> FileSlice {
self.slice(self.len() - from_offset..self.len())
}
/// Like `.slice(...)` but enforcing only the `to`
/// boundary.
///

View File

@@ -1,69 +1,45 @@
use crate::common::{BinarySerializable, CountingWriter, FixedSize, HasLen, VInt};
use crate::directory::error::Incompatibility;
use crate::directory::FileSlice;
use crate::directory::{AntiCallToken, TerminatingWrite};
use crate::Version;
use crate::{
common::{BinarySerializable, CountingWriter, DeserializeFrom, FixedSize, HasLen},
directory::{AntiCallToken, TerminatingWrite},
Version, INDEX_FORMAT_VERSION,
};
use crc32fast::Hasher;
use serde::{Deserialize, Serialize};
use std::io;
use std::io::Write;
const FOOTER_MAX_LEN: usize = 10_000;
const FOOTER_MAX_LEN: u32 = 50_000;
/// The magic byte of the footer to identify corruption
/// or an old version of the footer.
const FOOTER_MAGIC_NUMBER: u32 = 1337;
type CrcHashU32 = u32;
#[derive(Debug, Clone, PartialEq)]
/// A Footer is appended to every file
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Footer {
pub version: Version,
pub meta: String,
pub versioned_footer: VersionedFooter,
}
/// Serialises the footer to a byte-array
/// - versioned_footer_len : 4 bytes
///- versioned_footer: variable bytes
/// - meta_len: 4 bytes
/// - meta: variable bytes
/// - version_len: 4 bytes
/// - version json: variable bytes
impl BinarySerializable for Footer {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
BinarySerializable::serialize(&self.versioned_footer, writer)?;
BinarySerializable::serialize(&self.meta, writer)?;
let version_string =
serde_json::to_string(&self.version).map_err(|_err| io::ErrorKind::InvalidInput)?;
BinarySerializable::serialize(&version_string, writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let versioned_footer = VersionedFooter::deserialize(reader)?;
let meta = String::deserialize(reader)?;
let version_json = String::deserialize(reader)?;
let version = serde_json::from_str(&version_json)?;
Ok(Footer {
version,
meta,
versioned_footer,
})
}
pub crc: CrcHashU32,
}
impl Footer {
pub fn new(versioned_footer: VersionedFooter) -> Self {
pub fn new(crc: CrcHashU32) -> Self {
let version = crate::VERSION.clone();
let meta = version.to_string();
Footer {
version,
meta,
versioned_footer,
}
Footer { version, crc }
}
pub fn crc(&self) -> CrcHashU32 {
self.crc
}
pub fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
let mut counting_write = CountingWriter::wrap(&mut write);
self.serialize(&mut counting_write)?;
let written_len = counting_write.written_bytes();
(written_len as u32).serialize(write)?;
counting_write.write_all(serde_json::to_string(&self)?.as_ref())?;
let footer_payload_len = counting_write.written_bytes();
BinarySerializable::serialize(&(footer_payload_len as u32), write)?;
BinarySerializable::serialize(&(FOOTER_MAGIC_NUMBER as u32), write)?;
Ok(())
}
@@ -77,12 +53,47 @@ impl Footer {
),
));
}
let (body_footer, footer_len_file) = file.split_from_end(u32::SIZE_IN_BYTES);
let mut footer_len_bytes = footer_len_file.read_bytes()?;
let footer_len = u32::deserialize(&mut footer_len_bytes)? as usize;
let (body, footer) = body_footer.split_from_end(footer_len);
let mut footer_bytes = footer.read_bytes()?;
let footer = Footer::deserialize(&mut footer_bytes)?;
let footer_metadata_len = <(u32, u32)>::SIZE_IN_BYTES;
let (footer_len, footer_magic_byte): (u32, u32) = file
.slice_from_end(footer_metadata_len)
.read_bytes()?
.as_ref()
.deserialize()?;
if footer_magic_byte != FOOTER_MAGIC_NUMBER {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which is not supported anymore. Please use tantivy 0.15 or above to recreate the index.",
));
}
if footer_len > FOOTER_MAX_LEN {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Footer seems invalid as it suggests a footer len of {}. File is corrupted, \
or the index was created with a different & old version of tantivy.",
footer_len
),
));
}
let total_footer_size = footer_len as usize + footer_metadata_len;
if file.len() < total_footer_size {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"File corrupted. The file is smaller than it's footer bytes (len={}).",
total_footer_size
),
));
}
let footer: Footer = serde_json::from_slice(&file.read_bytes_slice(
file.len() - total_footer_size..file.len() - footer_metadata_len as usize,
)?)?;
let body = file.slice_to(file.len() - total_footer_size);
Ok((footer, body))
}
@@ -90,151 +101,16 @@ impl Footer {
/// Has to be called after `extract_footer` to make sure it's not accessing uninitialised memory
pub fn is_compatible(&self) -> Result<(), Incompatibility> {
let library_version = crate::version();
match &self.versioned_footer {
VersionedFooter::V1 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::V2 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::V3 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
if self.version.index_format_version < 4
|| self.version.index_format_version > INDEX_FORMAT_VERSION
{
return Err(Incompatibility::IndexMismatch {
library_version: library_version.clone(),
index_version: self.version.clone(),
}),
});
}
}
}
/// Footer that includes a crc32 hash that enables us to checksum files in the index
#[derive(Debug, Clone, PartialEq)]
pub enum VersionedFooter {
UnknownVersion,
V1 {
crc32: CrcHashU32,
store_compression: String,
},
// Introduction of the Block WAND information.
V2 {
crc32: CrcHashU32,
store_compression: String,
},
// Block wand max termfred on 1 byte
V3 {
crc32: CrcHashU32,
store_compression: String,
},
}
impl BinarySerializable for VersionedFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
match self {
VersionedFooter::V3 {
crc32,
store_compression: compression,
} => {
// Serializes a valid `VersionedFooter` or panics if the version is unknown
// [ version | crc_hash | compression_mode ]
// [ 0..4 | 4..8 | variable ]
BinarySerializable::serialize(&3u32, &mut buf)?;
BinarySerializable::serialize(crc32, &mut buf)?;
BinarySerializable::serialize(compression, &mut buf)?;
}
VersionedFooter::V2 { .. }
| VersionedFooter::V1 { .. }
| VersionedFooter::UnknownVersion => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot serialize an unknown versioned footer ",
));
}
}
BinarySerializable::serialize(&VInt(buf.len() as u64), writer)?;
assert!(buf.len() <= FOOTER_MAX_LEN);
writer.write_all(&buf[..])?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let len = VInt::deserialize(reader)?.0 as usize;
if len > FOOTER_MAX_LEN {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Footer seems invalid as it suggests a footer len of {}. File is corrupted, \
or the index was created with a different & old version of tantivy.",
len
),
));
}
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf[..])?;
let mut cursor = &buf[..];
let version = u32::deserialize(&mut cursor)?;
if version > 3 {
return Ok(VersionedFooter::UnknownVersion);
}
let crc32 = u32::deserialize(&mut cursor)?;
let store_compression = String::deserialize(&mut cursor)?;
Ok(if version == 1 {
VersionedFooter::V1 {
crc32,
store_compression,
}
} else if version == 2 {
VersionedFooter::V2 {
crc32,
store_compression,
}
} else {
assert_eq!(version, 3);
VersionedFooter::V3 {
crc32,
store_compression,
}
})
}
}
impl VersionedFooter {
pub fn crc(&self) -> Option<CrcHashU32> {
match self {
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
VersionedFooter::UnknownVersion { .. } => None,
}
}
}
pub(crate) struct FooterProxy<W: TerminatingWrite> {
@@ -268,10 +144,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc32 = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V3 {
crc32,
store_compression: crate::store::COMPRESSION.to_string(),
});
let footer = Footer::new(crc32);
let mut writer = self.writer.take().unwrap();
footer.append_footer(&mut writer)?;
writer.terminate()
@@ -281,140 +154,75 @@ impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
#[cfg(test)]
mod tests {
use super::CrcHashU32;
use super::FooterProxy;
use crate::common::{BinarySerializable, VInt};
use crate::directory::footer::{Footer, VersionedFooter};
use crate::directory::TerminatingWrite;
use byteorder::{ByteOrder, LittleEndian};
use regex::Regex;
use crate::directory::footer::Footer;
use crate::directory::OwnedBytes;
use crate::{
common::BinarySerializable,
directory::{footer::FOOTER_MAGIC_NUMBER, FileSlice},
};
use std::io;
#[test]
fn test_versioned_footer() {
let mut vec = Vec::new();
let footer_proxy = FooterProxy::new(&mut vec);
assert!(footer_proxy.terminate().is_ok());
if crate::store::COMPRESSION == "lz4" {
assert_eq!(vec.len(), 158);
} else if crate::store::COMPRESSION == "snappy" {
assert_eq!(vec.len(), 167);
} else if crate::store::COMPRESSION == "lz4_block" {
assert_eq!(vec.len(), 176);
}
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
assert!(matches!(
footer.versioned_footer,
VersionedFooter::V3 { store_compression, .. }
if store_compression == crate::store::COMPRESSION
));
assert_eq!(&footer.version, crate::version());
fn test_deserialize_footer() {
let mut buf: Vec<u8> = vec![];
let footer = Footer::new(123);
footer.append_footer(&mut buf).unwrap();
let owned_bytes = OwnedBytes::new(buf);
let fileslice = FileSlice::new(Box::new(owned_bytes));
let (footer_deser, _body) = Footer::extract_footer(fileslice).unwrap();
assert_eq!(footer_deser.crc(), footer.crc());
}
#[test]
fn test_serialize_deserialize_footer() {
let mut buffer = Vec::new();
let crc32 = 123456u32;
let footer: Footer = Footer::new(VersionedFooter::V3 {
crc32,
store_compression: "lz4".to_string(),
});
footer.serialize(&mut buffer).unwrap();
let footer_deser = Footer::deserialize(&mut &buffer[..]).unwrap();
assert_eq!(footer_deser, footer);
fn test_deserialize_footer_missing_magic_byte() {
let mut buf: Vec<u8> = vec![];
BinarySerializable::serialize(&0_u32, &mut buf).unwrap();
let wrong_magic_byte: u32 = 5555;
BinarySerializable::serialize(&wrong_magic_byte, &mut buf).unwrap();
let owned_bytes = OwnedBytes::new(buf);
let fileslice = FileSlice::new(Box::new(owned_bytes));
let err = Footer::extract_footer(fileslice).unwrap_err();
assert_eq!(
err.to_string(),
"Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which \
is not supported anymore. Please use tantivy 0.15 or above to recreate the index."
);
}
#[test]
fn footer_length() {
let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V3 {
crc32,
store_compression: "lz4".to_string(),
};
let mut buf = Vec::new();
versioned_footer.serialize(&mut buf).unwrap();
assert_eq!(buf.len(), 13);
let footer = Footer::new(versioned_footer);
let regex_ptn = Regex::new(
"tantivy v[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.{0,10}, index_format v[0-9]{1,5}",
)
.unwrap();
assert!(regex_ptn.is_match(&footer.meta));
}
fn test_deserialize_footer_wrong_filesize() {
let mut buf: Vec<u8> = vec![];
BinarySerializable::serialize(&100_u32, &mut buf).unwrap();
BinarySerializable::serialize(&FOOTER_MAGIC_NUMBER, &mut buf).unwrap();
#[test]
fn versioned_footer_from_bytes() {
let v_footer_bytes = vec![
// versionned footer length
12 | 128,
// index format version
3,
0,
0,
0,
// crc 32
12,
35,
89,
18,
// compression format
3 | 128,
b'l',
b'z',
b'4',
];
let mut cursor = &v_footer_bytes[..];
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
assert!(cursor.is_empty());
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
crc32: expected_crc,
store_compression: "lz4".to_string(),
};
assert_eq!(versioned_footer, expected_versioned_footer);
let mut buffer = Vec::new();
assert!(versioned_footer.serialize(&mut buffer).is_ok());
assert_eq!(&v_footer_bytes[..], &buffer[..]);
}
let owned_bytes = OwnedBytes::new(buf);
#[test]
fn versioned_footer_panic() {
let v_footer_bytes = vec![6u8 | 128u8, 3u8, 0u8, 0u8, 1u8, 0u8, 0u8];
let mut b = &v_footer_bytes[..];
let versioned_footer = VersionedFooter::deserialize(&mut b).unwrap();
assert!(b.is_empty());
let expected_versioned_footer = VersionedFooter::UnknownVersion;
assert_eq!(versioned_footer, expected_versioned_footer);
let mut buf = Vec::new();
assert!(versioned_footer.serialize(&mut buf).is_err());
}
#[test]
#[cfg(not(feature = "lz4"))]
fn compression_mismatch() {
let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V1 {
crc32,
store_compression: "lz4".to_string(),
};
let footer = Footer::new(versioned_footer);
let res = footer.is_compatible();
assert!(res.is_err());
let fileslice = FileSlice::new(Box::new(owned_bytes));
let err = Footer::extract_footer(fileslice).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
assert_eq!(
err.to_string(),
"File corrupted. The file is smaller than it\'s footer bytes (len=108)."
);
}
#[test]
fn test_deserialize_too_large_footer() {
let mut buf = vec![];
assert!(FooterProxy::new(&mut buf).terminate().is_ok());
let mut long_len_buf = [0u8; 10];
let num_bytes = VInt(super::FOOTER_MAX_LEN as u64 + 1u64).serialize_into(&mut long_len_buf);
buf[0..num_bytes].copy_from_slice(&long_len_buf[..num_bytes]);
let err = Footer::deserialize(&mut &buf[..]).unwrap_err();
let mut buf: Vec<u8> = vec![];
let footer_length = super::FOOTER_MAX_LEN + 1;
BinarySerializable::serialize(&footer_length, &mut buf).unwrap();
BinarySerializable::serialize(&FOOTER_MAGIC_NUMBER, &mut buf).unwrap();
let owned_bytes = OwnedBytes::new(buf);
let fileslice = FileSlice::new(Box::new(owned_bytes));
let err = Footer::extract_footer(fileslice).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert_eq!(
err.to_string(),
"Footer seems invalid as it suggests a footer len of 10001. File is corrupted, \
or the index was created with a different & old version of tantivy."
"Footer seems invalid as it suggests a footer len of 50001. File is corrupted, \
or the index was created with a different & old version of tantivy."
);
}
}

View File

@@ -245,11 +245,7 @@ impl ManagedDirectory {
let mut hasher = Hasher::new();
hasher.update(bytes.as_slice());
let crc = hasher.finalize();
Ok(footer
.versioned_footer
.crc()
.map(|v| v == crc)
.unwrap_or(false))
Ok(footer.crc() == crc)
}
/// List files for which checksum does not match content

View File

@@ -57,7 +57,7 @@ mod writer;
pub trait MultiValueLength {
/// returns the num of values associated to a doc_id
fn get_len(&self, doc_id: DocId) -> u64;
/// returns the sum of num of all values for all doc_ids
/// returns the sum of num values for all doc_ids
fn get_total_len(&self) -> u64;
}

View File

@@ -175,6 +175,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
option.clone(),
);
@@ -206,6 +207,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
option.clone(),
);
@@ -264,6 +266,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
);
@@ -288,6 +291,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
get_text_options(),
);
@@ -322,6 +326,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
);
@@ -352,6 +357,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
get_text_options(),
);
@@ -387,6 +393,7 @@ mod tests_indexsorting {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
);

View File

@@ -196,8 +196,8 @@ impl IndexMerger {
return Err(crate::TantivyError::InvalidArgument(err_msg));
}
Ok(IndexMerger {
schema,
index_settings,
schema,
readers,
max_doc,
})
@@ -402,15 +402,15 @@ impl IndexMerger {
.tuple_windows()
.all(|(field_accessor1, field_accessor2)| {
if sort_by_field.order.is_asc() {
return field_accessor1.max_value() <= field_accessor2.min_value();
field_accessor1.max_value() <= field_accessor2.min_value()
} else {
return field_accessor1.min_value() >= field_accessor2.max_value();
field_accessor1.min_value() >= field_accessor2.max_value()
}
});
Ok(everything_is_in_order)
}
pub(crate) fn get_sort_field_accessor<'a, 'b>(
pub(crate) fn get_sort_field_accessor<'b>(
reader: &SegmentReader,
sort_by_field: &'b IndexSortByField,
) -> crate::Result<FastFieldReader<u64>> {
@@ -1024,6 +1024,7 @@ impl IndexMerger {
//
// take 7 in order to not walk over all checkpoints.
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.compressor() != store_writer.compressor()
{
for doc_bytes_res in store_reader.iter_raw(reader.delete_bitset()) {
let doc_bytes = doc_bytes_res?;
@@ -1576,6 +1577,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
true,
);
@@ -1587,6 +1589,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
false,
);
@@ -1601,6 +1604,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
true,
);
@@ -1612,6 +1616,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
false,
);

View File

@@ -156,6 +156,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}));
}
@@ -175,6 +176,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
force_disjunct_segment_sort_values,
);
@@ -265,6 +267,7 @@ mod tests {
field: "intval".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
false,
);
@@ -368,7 +371,6 @@ mod bench_sorted_index_merge {
use crate::IndexSortByField;
use crate::IndexWriter;
use crate::Order;
use futures::executor::block_on;
use test::{self, Bencher};
fn create_index(sort_by_field: Option<IndexSortByField>) -> Index {
let mut schema_builder = Schema::builder();
@@ -376,12 +378,12 @@ mod bench_sorted_index_merge {
.set_fast(Cardinality::SingleValue)
.set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);
let int_field = schema_builder.add_u64_field("intval", int_options);
let schema = schema_builder.build();
let index_builder = Index::builder()
.schema(schema)
.settings(IndexSettings { sort_by_field });
let index_builder = Index::builder().schema(schema).settings(IndexSettings {
sort_by_field,
..Default::default()
});
let index = index_builder.create_in_ram().unwrap();
{
@@ -419,7 +421,7 @@ mod bench_sorted_index_merge {
b.iter(|| {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{
let u64_reader: FastFieldReader<u64> = reader
let u64_reader: FastFieldReader<u64> = reader.reader
.fast_fields()
.typed_fast_field_reader(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
@@ -444,7 +446,7 @@ mod bench_sorted_index_merge {
order: Order::Desc,
};
let index = create_index(Some(sort_by_field.clone()));
let field = index.schema().get_field("intval").unwrap();
//let field = index.schema().get_field("intval").unwrap();
let segments = index.searchable_segments().unwrap();
let merger: IndexMerger =
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;

View File

@@ -39,9 +39,10 @@ impl SegmentSerializer {
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write),
store_writer: StoreWriter::new(store_write, compressor),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,

View File

@@ -345,8 +345,11 @@ fn write(
let store_write = serializer
.segment_mut()
.open_write(SegmentComponent::Store)?;
let old_store_writer =
std::mem::replace(&mut serializer.store_writer, StoreWriter::new(store_write));
let compressor = serializer.segment().index().settings().docstore_compression;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
serializer
@@ -357,7 +360,6 @@ fn write(
let doc_bytes = store_read.get_document_bytes(*old_doc_id)?;
serializer.get_store_writer().store_bytes(&doc_bytes)?;
}
// TODO delete temp store
}
serializer.close()?;
Ok(())

View File

@@ -178,7 +178,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// Index format version.
const INDEX_FORMAT_VERSION: u32 = 3;
const INDEX_FORMAT_VERSION: u32 = 4;
/// Structure version for the index.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -187,7 +187,6 @@ pub struct Version {
minor: u32,
patch: u32,
index_format_version: u32,
store_compression: String,
}
impl fmt::Debug for Version {
@@ -201,14 +200,13 @@ static VERSION: Lazy<Version> = Lazy::new(|| Version {
minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
patch: env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
index_format_version: INDEX_FORMAT_VERSION,
store_compression: crate::store::COMPRESSION.to_string(),
});
impl ToString for Version {
fn to_string(&self) -> String {
format!(
"tantivy v{}.{}.{}, index_format v{}, store_compression: {}",
self.major, self.minor, self.patch, self.index_format_version, self.store_compression
"tantivy v{}.{}.{}, index_format v{}",
self.major, self.minor, self.patch, self.index_format_version
)
}
}

View File

@@ -1,10 +1,6 @@
use std::io;
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &'static str = "brotli";
#[inline]
pub fn compress(mut uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
let mut params = brotli::enc::BrotliEncoderParams::default();
params.quality = 5;
@@ -13,6 +9,7 @@ pub fn compress(mut uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result
Ok(())
}
#[inline]
pub fn decompress(mut compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
decompressed.clear();
brotli::BrotliDecompress(&mut compressed, decompressed)?;

View File

@@ -1,22 +0,0 @@
use std::io::{self, Read, Write};
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &str = "lz4";
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();
let mut encoder = lz4::EncoderBuilder::new().build(compressed)?;
encoder.write_all(&uncompressed)?;
let (_, encoder_result) = encoder.finish();
encoder_result?;
Ok(())
}
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
decompressed.clear();
let mut decoder = lz4::Decoder::new(compressed)?;
decoder.read_to_end(decompressed)?;
Ok(())
}

View File

@@ -2,17 +2,13 @@ use std::io::{self};
use core::convert::TryInto;
use lz4_flex::{compress_into, decompress_into};
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &str = "lz4_block";
#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();
let maximum_ouput_size = lz4_flex::block::get_maximum_output_size(uncompressed.len());
compressed.reserve(maximum_ouput_size);
compressed.extend_from_slice(&[0, 0, 0, 0]);
unsafe {
compressed.set_len(maximum_ouput_size + 4);
}
@@ -26,6 +22,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
Ok(())
}
#[inline]
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
decompressed.clear();
let uncompressed_size_bytes: &[u8; 4] = compressed

View File

@@ -1,10 +1,6 @@
use std::io::{self, Read, Write};
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &str = "snappy";
#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();
let mut encoder = snap::write::FrameEncoder::new(compressed);
@@ -13,6 +9,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
Ok(())
}
#[inline]
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
decompressed.clear();
snap::read::FrameDecoder::new(compressed).read_to_end(decompressed)?;

134
src/store/compressors.rs Normal file
View File

@@ -0,0 +1,134 @@
use serde::{Deserialize, Serialize};
use std::io;
pub trait StoreCompressor {
fn compress(&self, uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>;
fn decompress(&self, compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()>;
fn get_compressor_id() -> u8;
}
/// Compressor can be used on `IndexSettings` to choose
/// the compressor used to compress the doc store.
///
/// The default is Lz4Block, but also depends on the enabled feature flags.
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Compressor {
#[serde(rename = "lz4")]
/// Use the lz4 compressor (block format)
Lz4,
#[serde(rename = "brotli")]
/// Use the brotli compressor
Brotli,
#[serde(rename = "snappy")]
/// Use the snap compressor
Snappy,
}
impl Default for Compressor {
fn default() -> Self {
if cfg!(feature = "lz4-compression") {
Compressor::Lz4
} else if cfg!(feature = "brotli-compression") {
Compressor::Brotli
} else if cfg!(feature = "snappy-compression") {
Compressor::Snappy
} else {
panic!(
"all compressor feature flags like are disabled (e.g. lz4-compression), can't choose default compressor"
);
}
}
}
impl Compressor {
pub(crate) fn from_id(id: u8) -> Compressor {
match id {
1 => Compressor::Lz4,
2 => Compressor::Brotli,
3 => Compressor::Snappy,
_ => panic!("unknown compressor id {:?}", id),
}
}
pub(crate) fn get_id(&self) -> u8 {
match self {
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
}
}
#[inline]
pub(crate) fn compress(&self, uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
match self {
Self::Lz4 => {
#[cfg(feature = "lz4-compression")]
{
super::compression_lz4_block::compress(uncompressed, compressed)
}
#[cfg(not(feature = "lz4-compression"))]
{
panic!("lz4-compression feature flag not activated");
}
}
Self::Brotli => {
#[cfg(feature = "brotli-compression")]
{
super::compression_brotli::compress(uncompressed, compressed)
}
#[cfg(not(feature = "brotli-compression"))]
{
panic!("brotli-compression-compression feature flag not activated");
}
}
Self::Snappy => {
#[cfg(feature = "snappy-compression")]
{
super::compression_snap::compress(uncompressed, compressed)
}
#[cfg(not(feature = "snappy-compression"))]
{
panic!("snappy-compression feature flag not activated");
}
}
}
}
#[inline]
pub(crate) fn decompress(
&self,
compressed: &[u8],
decompressed: &mut Vec<u8>,
) -> io::Result<()> {
match self {
Self::Lz4 => {
#[cfg(feature = "lz4-compression")]
{
super::compression_lz4_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "lz4-compression"))]
{
panic!("lz4-compression feature flag not activated");
}
}
Self::Brotli => {
#[cfg(feature = "brotli-compression")]
{
super::compression_brotli::decompress(compressed, decompressed)
}
#[cfg(not(feature = "brotli-compression"))]
{
panic!("brotli-compression feature flag not activated");
}
}
Self::Snappy => {
#[cfg(feature = "snappy-compression")]
{
super::compression_snap::decompress(compressed, decompressed)
}
#[cfg(not(feature = "snappy-compression"))]
{
panic!("snappy-compression feature flag not activated");
}
}
}
}
}

69
src/store/footer.rs Normal file
View File

@@ -0,0 +1,69 @@
use crate::{
common::{BinarySerializable, FixedSize, HasLen},
directory::FileSlice,
store::Compressor,
};
use std::io;
#[derive(Debug, Clone, PartialEq)]
pub struct DocStoreFooter {
pub offset: u64,
pub compressor: Compressor,
}
/// Serialises the footer to a byte-array
/// - offset : 8 bytes
///- compressor id: 1 byte
/// - reserved for future use: 15 bytes
impl BinarySerializable for DocStoreFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
BinarySerializable::serialize(&self.offset, writer)?;
BinarySerializable::serialize(&self.compressor.get_id(), writer)?;
writer.write_all(&[0; 15])?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let offset = u64::deserialize(reader)?;
let compressor_id = u8::deserialize(reader)?;
let mut skip_buf = [0; 15];
reader.read_exact(&mut skip_buf)?;
Ok(DocStoreFooter {
offset,
compressor: Compressor::from_id(compressor_id),
})
}
}
impl FixedSize for DocStoreFooter {
const SIZE_IN_BYTES: usize = 24;
}
impl DocStoreFooter {
pub fn new(offset: u64, compressor: Compressor) -> Self {
DocStoreFooter { offset, compressor }
}
pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> {
if file.len() < DocStoreFooter::SIZE_IN_BYTES {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"File corrupted. The file is smaller than Footer::SIZE_IN_BYTES (len={}).",
file.len()
),
));
}
let (body, footer_slice) = file.split_from_end(DocStoreFooter::SIZE_IN_BYTES);
let mut footer_bytes = footer_slice.read_bytes()?;
let footer = DocStoreFooter::deserialize(&mut footer_bytes)?;
Ok((footer, body))
}
}
#[test]
fn doc_store_footer_test() {
// This test is just to safe guard changes on the footer.
// When the doc store footer is updated, make sure to update also the serialize/deserialize methods
assert_eq!(core::mem::size_of::<DocStoreFooter>(), 16);
}

View File

@@ -33,66 +33,23 @@ and should rely on either
!*/
mod compressors;
mod footer;
mod index;
mod reader;
mod writer;
pub use self::compressors::Compressor;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;
// compile_error doesn't scale very well, enum like feature flags would be great to have in Rust
#[cfg(all(feature = "lz4", feature = "brotli"))]
compile_error!("feature `lz4` or `brotli` must not be enabled together.");
#[cfg(all(feature = "lz4_block", feature = "brotli"))]
compile_error!("feature `lz4_block` or `brotli` must not be enabled together.");
#[cfg(all(feature = "lz4_block", feature = "lz4"))]
compile_error!("feature `lz4_block` or `lz4` must not be enabled together.");
#[cfg(all(feature = "lz4_block", feature = "snap"))]
compile_error!("feature `lz4_block` or `snap` must not be enabled together.");
#[cfg(all(feature = "lz4", feature = "snap"))]
compile_error!("feature `lz4` or `snap` must not be enabled together.");
#[cfg(all(feature = "brotli", feature = "snap"))]
compile_error!("feature `brotli` or `snap` must not be enabled together.");
#[cfg(not(any(
feature = "lz4",
feature = "brotli",
feature = "lz4_flex",
feature = "snap"
)))]
compile_error!("all compressors are deactivated via feature-flags, check Cargo.toml for available decompressors.");
#[cfg(feature = "lz4_flex")]
#[cfg(feature = "lz4-compression")]
mod compression_lz4_block;
#[cfg(feature = "lz4_flex")]
pub use self::compression_lz4_block::COMPRESSION;
#[cfg(feature = "lz4_flex")]
use self::compression_lz4_block::{compress, decompress};
#[cfg(feature = "lz4")]
mod compression_lz4;
#[cfg(feature = "lz4")]
pub use self::compression_lz4::COMPRESSION;
#[cfg(feature = "lz4")]
use self::compression_lz4::{compress, decompress};
#[cfg(feature = "brotli")]
#[cfg(feature = "brotli-compression")]
mod compression_brotli;
#[cfg(feature = "brotli")]
pub use self::compression_brotli::COMPRESSION;
#[cfg(feature = "brotli")]
use self::compression_brotli::{compress, decompress};
#[cfg(feature = "snap")]
#[cfg(feature = "snappy-compression")]
mod compression_snap;
#[cfg(feature = "snap")]
pub use self::compression_snap::COMPRESSION;
#[cfg(feature = "snap")]
use self::compression_snap::{compress, decompress};
#[cfg(test)]
pub mod tests {
@@ -109,28 +66,31 @@ pub mod tests {
use crate::{schema::Schema, Index};
use std::path::Path;
pub fn write_lorem_ipsum_store(writer: WritePtr, num_docs: usize) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
let lorem = String::from(
"Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed \
const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed \
do eiusmod tempor incididunt ut labore et dolore magna aliqua. \
Ut enim ad minim veniam, quis nostrud exercitation ullamco \
laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure \
dolor in reprehenderit in voluptate velit esse cillum dolore eu \
fugiat nulla pariatur. Excepteur sint occaecat cupidatat non \
proident, sunt in culpa qui officia deserunt mollit anim id est \
laborum.",
);
laborum.";
pub fn write_lorem_ipsum_store(
writer: WritePtr,
num_docs: usize,
compressor: Compressor,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer);
let mut store_writer = StoreWriter::new(writer, compressor);
for i in 0..num_docs {
let mut fields: Vec<FieldValue> = Vec::new();
{
let field_value = FieldValue::new(field_body, From::from(lorem.clone()));
let field_value = FieldValue::new(field_body, From::from(LOREM.to_string()));
fields.push(field_value);
}
{
@@ -147,12 +107,11 @@ pub mod tests {
schema
}
#[test]
fn test_store() -> crate::Result<()> {
fn test_store(compressor: Compressor) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, 1_000);
let schema = write_lorem_ipsum_store(store_wrt, 1_000, compressor);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
@@ -176,6 +135,22 @@ pub mod tests {
Ok(())
}
#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli)
}
#[test]
fn test_store_with_delete() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
@@ -216,6 +191,67 @@ pub mod tests {
}
Ok(())
}
#[cfg(feature = "snappy-compression")]
#[cfg(feature = "lz4-compression")]
#[test]
fn test_merge_with_changed_compressor() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text_field", TEXT | STORED);
let schema = schema_builder.build();
let index_builder = Index::builder().schema(schema);
let mut index = index_builder.create_in_ram().unwrap();
index.settings_mut().docstore_compression = Compressor::Lz4;
{
let mut index_writer = index.writer_for_tests().unwrap();
// put enough data create enough blocks in the doc store to be considered for stacking
for _ in 0..200 {
index_writer.add_document(doc!(text_field=> LOREM));
}
assert!(index_writer.commit().is_ok());
for _ in 0..200 {
index_writer.add_document(doc!(text_field=> LOREM));
}
assert!(index_writer.commit().is_ok());
}
assert_eq!(
index.reader().unwrap().searcher().segment_readers()[0]
.get_store_reader()
.unwrap()
.compressor(),
Compressor::Lz4
);
// Change compressor, this disables stacking on merging
let index_settings = index.settings_mut();
index_settings.docstore_compression = Compressor::Snappy;
// Merging the segments
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests().unwrap();
assert!(block_on(index_writer.merge(&segment_ids)).is_ok());
assert!(index_writer.wait_merging_threads().is_ok());
}
let searcher = index.reader().unwrap().searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader().unwrap();
for doc in store.iter(reader.delete_bitset()).take(50) {
assert_eq!(
*doc?.get_first(text_field).unwrap().text().unwrap(),
LOREM.to_string()
);
}
assert_eq!(store.compressor(), Compressor::Snappy);
Ok(())
}
#[test]
fn test_merge_of_small_segments() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
@@ -265,6 +301,7 @@ mod bench {
use super::tests::write_lorem_ipsum_store;
use crate::directory::Directory;
use crate::directory::RamDirectory;
use crate::store::Compressor;
use crate::store::StoreReader;
use std::path::Path;
use test::Bencher;
@@ -275,7 +312,11 @@ mod bench {
let directory = RamDirectory::create();
let path = Path::new("store");
b.iter(|| {
write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000);
write_lorem_ipsum_store(
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
);
directory.delete(path).unwrap();
});
}
@@ -284,11 +325,13 @@ mod bench {
fn bench_store_decode(b: &mut Bencher) {
let directory = RamDirectory::create();
let path = Path::new("store");
write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000);
write_lorem_ipsum_store(
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file).unwrap();
b.iter(|| {
store.get(12).unwrap();
});
b.iter(|| store.iter(None).collect::<Vec<_>>());
}
}

View File

@@ -1,18 +1,17 @@
use super::decompress;
use super::index::SkipIndex;
use super::Compressor;
use super::{footer::DocStoreFooter, index::SkipIndex};
use crate::directory::{FileSlice, OwnedBytes};
use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
use crate::{common::VInt, fastfield::DeleteBitSet};
use crate::{
common::{BinarySerializable, HasLen},
common::{BinarySerializable, HasLen, VInt},
error::DataCorruption,
fastfield::DeleteBitSet,
};
use lru::LruCache;
use std::io;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
@@ -24,6 +23,7 @@ type BlockCache = Arc<Mutex<LruCache<usize, Block>>>;
/// Reads document off tantivy's [`Store`](./index.html)
pub struct StoreReader {
compressor: Compressor,
data: FileSlice,
cache: BlockCache,
cache_hits: Arc<AtomicUsize>,
@@ -35,11 +35,14 @@ pub struct StoreReader {
impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
let (data_file, offset_index_file) = split_file(store_file)?;
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader {
compressor: footer.compressor,
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
cache_hits: Default::default(),
@@ -53,6 +56,10 @@ impl StoreReader {
self.skip_index.checkpoints()
}
pub(crate) fn compressor(&self) -> Compressor {
self.compressor
}
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.skip_index.seek(doc_id)
}
@@ -75,7 +82,8 @@ impl StoreReader {
let compressed_block = self.compressed_block(checkpoint)?;
let mut decompressed_block = vec![];
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
self.compressor
.decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = OwnedBytes::new(decompressed_block);
self.cache
@@ -230,14 +238,6 @@ impl StoreReader {
}
}
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
Ok(data.split(offset))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -255,7 +255,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500);
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;

View File

@@ -1,6 +1,6 @@
use super::compress;
use super::index::SkipIndexBuilder;
use super::StoreReader;
use super::{compressors::Compressor, footer::DocStoreFooter};
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite;
@@ -21,6 +21,7 @@ const BLOCK_SIZE: usize = 16_384;
/// The skip list index on the other hand, is built in memory.
///
pub struct StoreWriter {
compressor: Compressor,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
@@ -34,8 +35,9 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr) -> StoreWriter {
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
StoreWriter {
compressor,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
@@ -45,6 +47,10 @@ impl StoreWriter {
}
}
pub(crate) fn compressor(&self) -> Compressor {
self.compressor
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity()
@@ -125,7 +131,8 @@ impl StoreWriter {
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
self.compressor
.compress(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
@@ -147,8 +154,9 @@ impl StoreWriter {
self.write_and_compress_block()?;
}
let header_offset: u64 = self.writer.written_bytes() as u64;
let footer = DocStoreFooter::new(header_offset, self.compressor);
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
footer.serialize(&mut self.writer)?;
self.writer.terminate()
}
}