diff --git a/CHANGELOG.md b/CHANGELOG.md index 97d92ed4c..5277168c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,13 @@ Tantivy 0.15.0 - Simplified positions index format (@fulmicoton) #1022 - Moved bitpacking to bitpacker subcrate and add BlockedBitpacker, which bitpacks blocks of 128 elements (@PSeitz) #1030 - Added support for more-like-this query in tantivy (@evanxg852000) #1011 -- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations). #1026 +- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations)(@PSeitz). #1026 +- Add iterator over documents in doc store (@PSeitz). #1044 +- Fix log merge policy (@PSeitz). #1043 +- Add detection to avoid small doc store blocks on merge (@PSeitz). #1054 +- Make doc store compression dynamic (@PSeitz). #1060 +- Switch to json for footer version handling (@PSeitz). #1060 + Tantivy 0.14.0 ========================= diff --git a/Cargo.toml b/Cargo.toml index d83fc7f1e..0d93e2e08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ regex ={ version = "1.5.4", default-features = false, features = ["std"] } tantivy-fst = "0.3" memmap = {version = "0.7", optional=true} lz4_flex = { version = "0.8.0", default-features = false, features = ["checked-decode"], optional = true } -lz4 = { version = "1.23.2", optional = true } brotli = { version = "3.3", optional = true } snap = { version = "1.0.5", optional = true } tempfile = { version = "3.2", optional = true } @@ -77,12 +76,13 @@ debug-assertions = true overflow-checks = true [features] -default = ["mmap", "lz4-block-compression" ] +default = ["mmap", "lz4-compression" ] mmap = ["fs2", "tempfile", "memmap"] + brotli-compression = ["brotli"] -lz4-compression = ["lz4"] -lz4-block-compression = ["lz4_flex"] +lz4-compression = ["lz4_flex"] snappy-compression = ["snap"] + failpoints = ["fail/failpoints"] unstable = [] # useful for benches. wasm-bindgen = ["uuid/wasm-bindgen"] diff --git a/appveyor.yml b/appveyor.yml index da451a609..3a6353547 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -18,5 +18,6 @@ install: build: false test_script: - - REM SET RUST_LOG=tantivy,test & cargo test --all --verbose --no-default-features --features lz4-block-compression --features mmap + - REM SET RUST_LOG=tantivy,test & cargo test --all --verbose --no-default-features --features lz4-compression --features mmap + - REM SET RUST_LOG=tantivy,test & cargo test test_store --verbose --no-default-features --features lz4-compression --features snappy-compression --features brotli-compression --features mmap - REM SET RUST_BACKTRACE=1 & cargo build --examples diff --git a/src/common/mod.rs b/src/common/mod.rs index bfa3b1262..ea7507b04 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -8,7 +8,7 @@ pub use self::bitset::BitSet; pub(crate) use self::bitset::TinySet; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; pub use self::counting_writer::CountingWriter; -pub use self::serialize::{BinarySerializable, FixedSize}; +pub use self::serialize::{BinarySerializable, DeserializeFrom, FixedSize}; pub use self::vint::{ read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, }; diff --git a/src/common/serialize.rs b/src/common/serialize.rs index 6b89bbe70..779b839af 100644 --- a/src/common/serialize.rs +++ b/src/common/serialize.rs @@ -14,6 +14,20 @@ pub trait BinarySerializable: fmt::Debug + Sized { fn deserialize(reader: &mut R) -> io::Result; } +pub trait DeserializeFrom { + fn deserialize(&mut self) -> io::Result; +} + +/// Implement deserialize from &[u8] for all types which implement BinarySerializable. +/// +/// TryFrom would actually be preferrable, but not possible because of the orphan +/// rules (not completely sure if this could be resolved) +impl DeserializeFrom for &[u8] { + fn deserialize(&mut self) -> io::Result { + T::deserialize(self) + } +} + /// `FixedSize` marks a `BinarySerializable` as /// always serializing to the same size. pub trait FixedSize: BinarySerializable { @@ -61,6 +75,11 @@ impl BinarySerializable for Ok((Left::deserialize(reader)?, Right::deserialize(reader)?)) } } +impl FixedSize + for (Left, Right) +{ + const SIZE_IN_BYTES: usize = Left::SIZE_IN_BYTES + Right::SIZE_IN_BYTES; +} impl BinarySerializable for u32 { fn serialize(&self, writer: &mut W) -> io::Result<()> { diff --git a/src/core/index.rs b/src/core/index.rs index 582e3f26f..3621e4d78 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -76,7 +76,7 @@ fn load_metas( /// ); /// /// let schema = schema_builder.build(); -/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc})}; +/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc}), ..Default::default()}; /// let index = Index::builder().schema(schema).settings(settings).create_in_ram(); /// /// ``` @@ -173,7 +173,7 @@ impl IndexBuilder { &directory, )?; let mut metas = IndexMeta::with_schema(self.get_expect_schema()?); - metas.index_settings = self.index_settings.clone(); + metas.index_settings = self.index_settings; let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default()); Ok(index) } @@ -460,6 +460,13 @@ impl Index { pub fn settings(&self) -> &IndexSettings { &self.settings } + + /// Accessor to the index settings + /// + pub fn settings_mut(&mut self) -> &mut IndexSettings { + &mut self.settings + } + /// Accessor to the index schema /// /// The schema is actually cloned. diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 62493f23b..37ca157f4 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -1,7 +1,7 @@ use super::SegmentComponent; -use crate::core::SegmentId; use crate::schema::Schema; use crate::Opstamp; +use crate::{core::SegmentId, store::Compressor}; use census::{Inventory, TrackedObject}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -233,7 +233,11 @@ impl InnerSegmentMeta { pub struct IndexSettings { /// Sorts the documents by information /// provided in `IndexSortByField` + #[serde(skip_serializing_if = "Option::is_none")] pub sort_by_field: Option, + /// The `Compressor` used to compress the doc store. + #[serde(default)] + pub docstore_compression: Compressor, } /// Settings to presort the documents in an index /// @@ -380,6 +384,7 @@ mod tests { field: "text".to_string(), order: Order::Asc, }), + ..Default::default() }, segments: Vec::new(), schema, @@ -389,7 +394,7 @@ mod tests { let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); assert_eq!( json, - r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"}},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"# + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"# ); } } diff --git a/src/directory/file_slice.rs b/src/directory/file_slice.rs index d6f9d00ad..bcd85a2f7 100644 --- a/src/directory/file_slice.rs +++ b/src/directory/file_slice.rs @@ -147,6 +147,13 @@ impl FileSlice { self.slice(from_offset..self.len()) } + /// Returns a slice from the end. + /// + /// Equivalent to `.slice(self.len() - from_offset, self.len())` + pub fn slice_from_end(&self, from_offset: usize) -> FileSlice { + self.slice(self.len() - from_offset..self.len()) + } + /// Like `.slice(...)` but enforcing only the `to` /// boundary. /// diff --git a/src/directory/footer.rs b/src/directory/footer.rs index 10b8b6f15..79eaa53e9 100644 --- a/src/directory/footer.rs +++ b/src/directory/footer.rs @@ -1,69 +1,45 @@ -use crate::common::{BinarySerializable, CountingWriter, FixedSize, HasLen, VInt}; use crate::directory::error::Incompatibility; use crate::directory::FileSlice; -use crate::directory::{AntiCallToken, TerminatingWrite}; -use crate::Version; +use crate::{ + common::{BinarySerializable, CountingWriter, DeserializeFrom, FixedSize, HasLen}, + directory::{AntiCallToken, TerminatingWrite}, + Version, INDEX_FORMAT_VERSION, +}; use crc32fast::Hasher; +use serde::{Deserialize, Serialize}; use std::io; use std::io::Write; -const FOOTER_MAX_LEN: usize = 10_000; +const FOOTER_MAX_LEN: u32 = 50_000; + +/// The magic byte of the footer to identify corruption +/// or an old version of the footer. +const FOOTER_MAGIC_NUMBER: u32 = 1337; type CrcHashU32 = u32; -#[derive(Debug, Clone, PartialEq)] +/// A Footer is appended to every file +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Footer { pub version: Version, - pub meta: String, - pub versioned_footer: VersionedFooter, -} - -/// Serialises the footer to a byte-array -/// - versioned_footer_len : 4 bytes -///- versioned_footer: variable bytes -/// - meta_len: 4 bytes -/// - meta: variable bytes -/// - version_len: 4 bytes -/// - version json: variable bytes -impl BinarySerializable for Footer { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - BinarySerializable::serialize(&self.versioned_footer, writer)?; - BinarySerializable::serialize(&self.meta, writer)?; - let version_string = - serde_json::to_string(&self.version).map_err(|_err| io::ErrorKind::InvalidInput)?; - BinarySerializable::serialize(&version_string, writer)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let versioned_footer = VersionedFooter::deserialize(reader)?; - let meta = String::deserialize(reader)?; - let version_json = String::deserialize(reader)?; - let version = serde_json::from_str(&version_json)?; - Ok(Footer { - version, - meta, - versioned_footer, - }) - } + pub crc: CrcHashU32, } impl Footer { - pub fn new(versioned_footer: VersionedFooter) -> Self { + pub fn new(crc: CrcHashU32) -> Self { let version = crate::VERSION.clone(); - let meta = version.to_string(); - Footer { - version, - meta, - versioned_footer, - } + Footer { version, crc } } + pub fn crc(&self) -> CrcHashU32 { + self.crc + } pub fn append_footer(&self, mut write: &mut W) -> io::Result<()> { let mut counting_write = CountingWriter::wrap(&mut write); - self.serialize(&mut counting_write)?; - let written_len = counting_write.written_bytes(); - (written_len as u32).serialize(write)?; + counting_write.write_all(serde_json::to_string(&self)?.as_ref())?; + let footer_payload_len = counting_write.written_bytes(); + BinarySerializable::serialize(&(footer_payload_len as u32), write)?; + BinarySerializable::serialize(&(FOOTER_MAGIC_NUMBER as u32), write)?; Ok(()) } @@ -77,12 +53,47 @@ impl Footer { ), )); } - let (body_footer, footer_len_file) = file.split_from_end(u32::SIZE_IN_BYTES); - let mut footer_len_bytes = footer_len_file.read_bytes()?; - let footer_len = u32::deserialize(&mut footer_len_bytes)? as usize; - let (body, footer) = body_footer.split_from_end(footer_len); - let mut footer_bytes = footer.read_bytes()?; - let footer = Footer::deserialize(&mut footer_bytes)?; + + let footer_metadata_len = <(u32, u32)>::SIZE_IN_BYTES; + let (footer_len, footer_magic_byte): (u32, u32) = file + .slice_from_end(footer_metadata_len) + .read_bytes()? + .as_ref() + .deserialize()?; + + if footer_magic_byte != FOOTER_MAGIC_NUMBER { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which is not supported anymore. Please use tantivy 0.15 or above to recreate the index.", + )); + } + + if footer_len > FOOTER_MAX_LEN { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Footer seems invalid as it suggests a footer len of {}. File is corrupted, \ + or the index was created with a different & old version of tantivy.", + footer_len + ), + )); + } + let total_footer_size = footer_len as usize + footer_metadata_len; + if file.len() < total_footer_size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The file is smaller than it's footer bytes (len={}).", + total_footer_size + ), + )); + } + + let footer: Footer = serde_json::from_slice(&file.read_bytes_slice( + file.len() - total_footer_size..file.len() - footer_metadata_len as usize, + )?)?; + + let body = file.slice_to(file.len() - total_footer_size); Ok((footer, body)) } @@ -90,151 +101,16 @@ impl Footer { /// Has to be called after `extract_footer` to make sure it's not accessing uninitialised memory pub fn is_compatible(&self) -> Result<(), Incompatibility> { let library_version = crate::version(); - match &self.versioned_footer { - VersionedFooter::V1 { - crc32: _crc, - store_compression, - } => { - if &library_version.store_compression != store_compression { - return Err(Incompatibility::CompressionMismatch { - library_compression_format: library_version.store_compression.to_string(), - index_compression_format: store_compression.to_string(), - }); - } - Ok(()) - } - VersionedFooter::V2 { - crc32: _crc, - store_compression, - } => { - if &library_version.store_compression != store_compression { - return Err(Incompatibility::CompressionMismatch { - library_compression_format: library_version.store_compression.to_string(), - index_compression_format: store_compression.to_string(), - }); - } - Ok(()) - } - VersionedFooter::V3 { - crc32: _crc, - store_compression, - } => { - if &library_version.store_compression != store_compression { - return Err(Incompatibility::CompressionMismatch { - library_compression_format: library_version.store_compression.to_string(), - index_compression_format: store_compression.to_string(), - }); - } - Ok(()) - } - VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch { + if self.version.index_format_version < 4 + || self.version.index_format_version > INDEX_FORMAT_VERSION + { + return Err(Incompatibility::IndexMismatch { library_version: library_version.clone(), index_version: self.version.clone(), - }), + }); } - } -} - -/// Footer that includes a crc32 hash that enables us to checksum files in the index -#[derive(Debug, Clone, PartialEq)] -pub enum VersionedFooter { - UnknownVersion, - V1 { - crc32: CrcHashU32, - store_compression: String, - }, - // Introduction of the Block WAND information. - V2 { - crc32: CrcHashU32, - store_compression: String, - }, - // Block wand max termfred on 1 byte - V3 { - crc32: CrcHashU32, - store_compression: String, - }, -} - -impl BinarySerializable for VersionedFooter { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - let mut buf = Vec::new(); - match self { - VersionedFooter::V3 { - crc32, - store_compression: compression, - } => { - // Serializes a valid `VersionedFooter` or panics if the version is unknown - // [ version | crc_hash | compression_mode ] - // [ 0..4 | 4..8 | variable ] - BinarySerializable::serialize(&3u32, &mut buf)?; - BinarySerializable::serialize(crc32, &mut buf)?; - BinarySerializable::serialize(compression, &mut buf)?; - } - VersionedFooter::V2 { .. } - | VersionedFooter::V1 { .. } - | VersionedFooter::UnknownVersion => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Cannot serialize an unknown versioned footer ", - )); - } - } - BinarySerializable::serialize(&VInt(buf.len() as u64), writer)?; - assert!(buf.len() <= FOOTER_MAX_LEN); - writer.write_all(&buf[..])?; Ok(()) } - - fn deserialize(reader: &mut R) -> io::Result { - let len = VInt::deserialize(reader)?.0 as usize; - if len > FOOTER_MAX_LEN { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!( - "Footer seems invalid as it suggests a footer len of {}. File is corrupted, \ - or the index was created with a different & old version of tantivy.", - len - ), - )); - } - let mut buf = vec![0u8; len]; - reader.read_exact(&mut buf[..])?; - let mut cursor = &buf[..]; - let version = u32::deserialize(&mut cursor)?; - if version > 3 { - return Ok(VersionedFooter::UnknownVersion); - } - let crc32 = u32::deserialize(&mut cursor)?; - let store_compression = String::deserialize(&mut cursor)?; - Ok(if version == 1 { - VersionedFooter::V1 { - crc32, - store_compression, - } - } else if version == 2 { - VersionedFooter::V2 { - crc32, - store_compression, - } - } else { - assert_eq!(version, 3); - VersionedFooter::V3 { - crc32, - store_compression, - } - }) - } -} - -impl VersionedFooter { - pub fn crc(&self) -> Option { - match self { - VersionedFooter::V3 { crc32, .. } => Some(*crc32), - VersionedFooter::V2 { crc32, .. } => Some(*crc32), - VersionedFooter::V1 { crc32, .. } => Some(*crc32), - VersionedFooter::UnknownVersion { .. } => None, - } - } } pub(crate) struct FooterProxy { @@ -268,10 +144,7 @@ impl Write for FooterProxy { impl TerminatingWrite for FooterProxy { fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { let crc32 = self.hasher.take().unwrap().finalize(); - let footer = Footer::new(VersionedFooter::V3 { - crc32, - store_compression: crate::store::COMPRESSION.to_string(), - }); + let footer = Footer::new(crc32); let mut writer = self.writer.take().unwrap(); footer.append_footer(&mut writer)?; writer.terminate() @@ -281,140 +154,75 @@ impl TerminatingWrite for FooterProxy { #[cfg(test)] mod tests { - use super::CrcHashU32; - use super::FooterProxy; - use crate::common::{BinarySerializable, VInt}; - use crate::directory::footer::{Footer, VersionedFooter}; - use crate::directory::TerminatingWrite; - use byteorder::{ByteOrder, LittleEndian}; - use regex::Regex; + use crate::directory::footer::Footer; + use crate::directory::OwnedBytes; + use crate::{ + common::BinarySerializable, + directory::{footer::FOOTER_MAGIC_NUMBER, FileSlice}, + }; use std::io; #[test] - fn test_versioned_footer() { - let mut vec = Vec::new(); - let footer_proxy = FooterProxy::new(&mut vec); - assert!(footer_proxy.terminate().is_ok()); - if crate::store::COMPRESSION == "lz4" { - assert_eq!(vec.len(), 158); - } else if crate::store::COMPRESSION == "snappy" { - assert_eq!(vec.len(), 167); - } else if crate::store::COMPRESSION == "lz4_block" { - assert_eq!(vec.len(), 176); - } - let footer = Footer::deserialize(&mut &vec[..]).unwrap(); - assert!(matches!( - footer.versioned_footer, - VersionedFooter::V3 { store_compression, .. } - if store_compression == crate::store::COMPRESSION - )); - assert_eq!(&footer.version, crate::version()); + fn test_deserialize_footer() { + let mut buf: Vec = vec![]; + let footer = Footer::new(123); + footer.append_footer(&mut buf).unwrap(); + let owned_bytes = OwnedBytes::new(buf); + let fileslice = FileSlice::new(Box::new(owned_bytes)); + let (footer_deser, _body) = Footer::extract_footer(fileslice).unwrap(); + assert_eq!(footer_deser.crc(), footer.crc()); } - #[test] - fn test_serialize_deserialize_footer() { - let mut buffer = Vec::new(); - let crc32 = 123456u32; - let footer: Footer = Footer::new(VersionedFooter::V3 { - crc32, - store_compression: "lz4".to_string(), - }); - footer.serialize(&mut buffer).unwrap(); - let footer_deser = Footer::deserialize(&mut &buffer[..]).unwrap(); - assert_eq!(footer_deser, footer); + fn test_deserialize_footer_missing_magic_byte() { + let mut buf: Vec = vec![]; + BinarySerializable::serialize(&0_u32, &mut buf).unwrap(); + let wrong_magic_byte: u32 = 5555; + BinarySerializable::serialize(&wrong_magic_byte, &mut buf).unwrap(); + + let owned_bytes = OwnedBytes::new(buf); + + let fileslice = FileSlice::new(Box::new(owned_bytes)); + let err = Footer::extract_footer(fileslice).unwrap_err(); + assert_eq!( + err.to_string(), + "Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which \ + is not supported anymore. Please use tantivy 0.15 or above to recreate the index." + ); } - #[test] - fn footer_length() { - let crc32 = 1111111u32; - let versioned_footer = VersionedFooter::V3 { - crc32, - store_compression: "lz4".to_string(), - }; - let mut buf = Vec::new(); - versioned_footer.serialize(&mut buf).unwrap(); - assert_eq!(buf.len(), 13); - let footer = Footer::new(versioned_footer); - let regex_ptn = Regex::new( - "tantivy v[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.{0,10}, index_format v[0-9]{1,5}", - ) - .unwrap(); - assert!(regex_ptn.is_match(&footer.meta)); - } + fn test_deserialize_footer_wrong_filesize() { + let mut buf: Vec = vec![]; + BinarySerializable::serialize(&100_u32, &mut buf).unwrap(); + BinarySerializable::serialize(&FOOTER_MAGIC_NUMBER, &mut buf).unwrap(); - #[test] - fn versioned_footer_from_bytes() { - let v_footer_bytes = vec![ - // versionned footer length - 12 | 128, - // index format version - 3, - 0, - 0, - 0, - // crc 32 - 12, - 35, - 89, - 18, - // compression format - 3 | 128, - b'l', - b'z', - b'4', - ]; - let mut cursor = &v_footer_bytes[..]; - let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap(); - assert!(cursor.is_empty()); - let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32; - let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 { - crc32: expected_crc, - store_compression: "lz4".to_string(), - }; - assert_eq!(versioned_footer, expected_versioned_footer); - let mut buffer = Vec::new(); - assert!(versioned_footer.serialize(&mut buffer).is_ok()); - assert_eq!(&v_footer_bytes[..], &buffer[..]); - } + let owned_bytes = OwnedBytes::new(buf); - #[test] - fn versioned_footer_panic() { - let v_footer_bytes = vec![6u8 | 128u8, 3u8, 0u8, 0u8, 1u8, 0u8, 0u8]; - let mut b = &v_footer_bytes[..]; - let versioned_footer = VersionedFooter::deserialize(&mut b).unwrap(); - assert!(b.is_empty()); - let expected_versioned_footer = VersionedFooter::UnknownVersion; - assert_eq!(versioned_footer, expected_versioned_footer); - let mut buf = Vec::new(); - assert!(versioned_footer.serialize(&mut buf).is_err()); - } - - #[test] - #[cfg(not(feature = "lz4"))] - fn compression_mismatch() { - let crc32 = 1111111u32; - let versioned_footer = VersionedFooter::V1 { - crc32, - store_compression: "lz4".to_string(), - }; - let footer = Footer::new(versioned_footer); - let res = footer.is_compatible(); - assert!(res.is_err()); + let fileslice = FileSlice::new(Box::new(owned_bytes)); + let err = Footer::extract_footer(fileslice).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof); + assert_eq!( + err.to_string(), + "File corrupted. The file is smaller than it\'s footer bytes (len=108)." + ); } #[test] fn test_deserialize_too_large_footer() { - let mut buf = vec![]; - assert!(FooterProxy::new(&mut buf).terminate().is_ok()); - let mut long_len_buf = [0u8; 10]; - let num_bytes = VInt(super::FOOTER_MAX_LEN as u64 + 1u64).serialize_into(&mut long_len_buf); - buf[0..num_bytes].copy_from_slice(&long_len_buf[..num_bytes]); - let err = Footer::deserialize(&mut &buf[..]).unwrap_err(); + let mut buf: Vec = vec![]; + + let footer_length = super::FOOTER_MAX_LEN + 1; + BinarySerializable::serialize(&footer_length, &mut buf).unwrap(); + BinarySerializable::serialize(&FOOTER_MAGIC_NUMBER, &mut buf).unwrap(); + + let owned_bytes = OwnedBytes::new(buf); + + let fileslice = FileSlice::new(Box::new(owned_bytes)); + let err = Footer::extract_footer(fileslice).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::InvalidData); assert_eq!( err.to_string(), - "Footer seems invalid as it suggests a footer len of 10001. File is corrupted, \ - or the index was created with a different & old version of tantivy." + "Footer seems invalid as it suggests a footer len of 50001. File is corrupted, \ + or the index was created with a different & old version of tantivy." ); } } diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 0c69a2689..a3637001f 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -245,11 +245,7 @@ impl ManagedDirectory { let mut hasher = Hasher::new(); hasher.update(bytes.as_slice()); let crc = hasher.finalize(); - Ok(footer - .versioned_footer - .crc() - .map(|v| v == crc) - .unwrap_or(false)) + Ok(footer.crc() == crc) } /// List files for which checksum does not match content diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 385c89e60..f73cd040d 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -57,7 +57,7 @@ mod writer; pub trait MultiValueLength { /// returns the num of values associated to a doc_id fn get_len(&self, doc_id: DocId) -> u64; - /// returns the sum of num of all values for all doc_ids + /// returns the sum of num values for all doc_ids fn get_total_len(&self) -> u64; } diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index a3760e509..ba9b2cb10 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -175,6 +175,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Asc, }), + ..Default::default() }), option.clone(), ); @@ -206,6 +207,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Desc, }), + ..Default::default() }), option.clone(), ); @@ -264,6 +266,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Asc, }), + ..Default::default() }), get_text_options(), ); @@ -288,6 +291,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Desc, }), + ..Default::default() }), get_text_options(), ); @@ -322,6 +326,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Asc, }), + ..Default::default() }), get_text_options(), ); @@ -352,6 +357,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Desc, }), + ..Default::default() }), get_text_options(), ); @@ -387,6 +393,7 @@ mod tests_indexsorting { field: "my_number".to_string(), order: Order::Asc, }), + ..Default::default() }), get_text_options(), ); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index df15a925b..087288d8c 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -196,8 +196,8 @@ impl IndexMerger { return Err(crate::TantivyError::InvalidArgument(err_msg)); } Ok(IndexMerger { - schema, index_settings, + schema, readers, max_doc, }) @@ -402,15 +402,15 @@ impl IndexMerger { .tuple_windows() .all(|(field_accessor1, field_accessor2)| { if sort_by_field.order.is_asc() { - return field_accessor1.max_value() <= field_accessor2.min_value(); + field_accessor1.max_value() <= field_accessor2.min_value() } else { - return field_accessor1.min_value() >= field_accessor2.max_value(); + field_accessor1.min_value() >= field_accessor2.max_value() } }); Ok(everything_is_in_order) } - pub(crate) fn get_sort_field_accessor<'a, 'b>( + pub(crate) fn get_sort_field_accessor<'b>( reader: &SegmentReader, sort_by_field: &'b IndexSortByField, ) -> crate::Result> { @@ -1024,6 +1024,7 @@ impl IndexMerger { // // take 7 in order to not walk over all checkpoints. || store_reader.block_checkpoints().take(7).count() < 6 + || store_reader.compressor() != store_writer.compressor() { for doc_bytes_res in store_reader.iter_raw(reader.delete_bitset()) { let doc_bytes = doc_bytes_res?; @@ -1576,6 +1577,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() }), true, ); @@ -1587,6 +1589,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() }), false, ); @@ -1601,6 +1604,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() }), true, ); @@ -1612,6 +1616,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() }), false, ); diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index a02adde46..0544f6234 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -156,6 +156,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() })); } @@ -175,6 +176,7 @@ mod tests { field: "intval".to_string(), order: Order::Desc, }), + ..Default::default() }), force_disjunct_segment_sort_values, ); @@ -265,6 +267,7 @@ mod tests { field: "intval".to_string(), order: Order::Asc, }), + ..Default::default() }), false, ); @@ -368,7 +371,6 @@ mod bench_sorted_index_merge { use crate::IndexSortByField; use crate::IndexWriter; use crate::Order; - use futures::executor::block_on; use test::{self, Bencher}; fn create_index(sort_by_field: Option) -> Index { let mut schema_builder = Schema::builder(); @@ -376,12 +378,12 @@ mod bench_sorted_index_merge { .set_fast(Cardinality::SingleValue) .set_indexed(); let int_field = schema_builder.add_u64_field("intval", int_options); - let int_field = schema_builder.add_u64_field("intval", int_options); let schema = schema_builder.build(); - let index_builder = Index::builder() - .schema(schema) - .settings(IndexSettings { sort_by_field }); + let index_builder = Index::builder().schema(schema).settings(IndexSettings { + sort_by_field, + ..Default::default() + }); let index = index_builder.create_in_ram().unwrap(); { @@ -419,7 +421,7 @@ mod bench_sorted_index_merge { b.iter(|| { let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{ - let u64_reader: FastFieldReader = reader + let u64_reader: FastFieldReader = reader.reader .fast_fields() .typed_fast_field_reader(field) .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); @@ -444,7 +446,7 @@ mod bench_sorted_index_merge { order: Order::Desc, }; let index = create_index(Some(sort_by_field.clone())); - let field = index.schema().get_field("intval").unwrap(); + //let field = index.schema().get_field("intval").unwrap(); let segments = index.searchable_segments().unwrap(); let merger: IndexMerger = IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?; diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index e71964088..d1a119b80 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -39,9 +39,10 @@ impl SegmentSerializer { let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?; let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; + let compressor = segment.index().settings().docstore_compression; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write), + store_writer: StoreWriter::new(store_write, compressor), fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 55b6ede7e..ac737340e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -345,8 +345,11 @@ fn write( let store_write = serializer .segment_mut() .open_write(SegmentComponent::Store)?; - let old_store_writer = - std::mem::replace(&mut serializer.store_writer, StoreWriter::new(store_write)); + let compressor = serializer.segment().index().settings().docstore_compression; + let old_store_writer = std::mem::replace( + &mut serializer.store_writer, + StoreWriter::new(store_write, compressor), + ); old_store_writer.close()?; let store_read = StoreReader::open( serializer @@ -357,7 +360,6 @@ fn write( let doc_bytes = store_read.get_document_bytes(*old_doc_id)?; serializer.get_store_writer().store_bytes(&doc_bytes)?; } - // TODO delete temp store } serializer.close()?; Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 954ae1c50..2f696efd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,7 +178,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; /// Index format version. -const INDEX_FORMAT_VERSION: u32 = 3; +const INDEX_FORMAT_VERSION: u32 = 4; /// Structure version for the index. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -187,7 +187,6 @@ pub struct Version { minor: u32, patch: u32, index_format_version: u32, - store_compression: String, } impl fmt::Debug for Version { @@ -201,14 +200,13 @@ static VERSION: Lazy = Lazy::new(|| Version { minor: env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), patch: env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), index_format_version: INDEX_FORMAT_VERSION, - store_compression: crate::store::COMPRESSION.to_string(), }); impl ToString for Version { fn to_string(&self) -> String { format!( - "tantivy v{}.{}.{}, index_format v{}, store_compression: {}", - self.major, self.minor, self.patch, self.index_format_version, self.store_compression + "tantivy v{}.{}.{}, index_format v{}", + self.major, self.minor, self.patch, self.index_format_version ) } } diff --git a/src/store/compression_brotli.rs b/src/store/compression_brotli.rs index 7a254b382..faf26f866 100644 --- a/src/store/compression_brotli.rs +++ b/src/store/compression_brotli.rs @@ -1,10 +1,6 @@ use std::io; -/// Name of the compression scheme used in the doc store. -/// -/// This name is appended to the version string of tantivy. -pub const COMPRESSION: &'static str = "brotli"; - +#[inline] pub fn compress(mut uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { let mut params = brotli::enc::BrotliEncoderParams::default(); params.quality = 5; @@ -13,6 +9,7 @@ pub fn compress(mut uncompressed: &[u8], compressed: &mut Vec) -> io::Result Ok(()) } +#[inline] pub fn decompress(mut compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { decompressed.clear(); brotli::BrotliDecompress(&mut compressed, decompressed)?; diff --git a/src/store/compression_lz4.rs b/src/store/compression_lz4.rs deleted file mode 100644 index 9fd079d92..000000000 --- a/src/store/compression_lz4.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::io::{self, Read, Write}; - -/// Name of the compression scheme used in the doc store. -/// -/// This name is appended to the version string of tantivy. -pub const COMPRESSION: &str = "lz4"; - -pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { - compressed.clear(); - let mut encoder = lz4::EncoderBuilder::new().build(compressed)?; - encoder.write_all(&uncompressed)?; - let (_, encoder_result) = encoder.finish(); - encoder_result?; - Ok(()) -} - -pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { - decompressed.clear(); - let mut decoder = lz4::Decoder::new(compressed)?; - decoder.read_to_end(decompressed)?; - Ok(()) -} diff --git a/src/store/compression_lz4_block.rs b/src/store/compression_lz4_block.rs index dfce75e66..4008ce690 100644 --- a/src/store/compression_lz4_block.rs +++ b/src/store/compression_lz4_block.rs @@ -2,17 +2,13 @@ use std::io::{self}; use core::convert::TryInto; use lz4_flex::{compress_into, decompress_into}; -/// Name of the compression scheme used in the doc store. -/// -/// This name is appended to the version string of tantivy. -pub const COMPRESSION: &str = "lz4_block"; +#[inline] pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { compressed.clear(); let maximum_ouput_size = lz4_flex::block::get_maximum_output_size(uncompressed.len()); compressed.reserve(maximum_ouput_size); - compressed.extend_from_slice(&[0, 0, 0, 0]); unsafe { compressed.set_len(maximum_ouput_size + 4); } @@ -26,6 +22,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> Ok(()) } +#[inline] pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { decompressed.clear(); let uncompressed_size_bytes: &[u8; 4] = compressed diff --git a/src/store/compression_snap.rs b/src/store/compression_snap.rs index 43e19eb85..a3bd044ee 100644 --- a/src/store/compression_snap.rs +++ b/src/store/compression_snap.rs @@ -1,10 +1,6 @@ use std::io::{self, Read, Write}; -/// Name of the compression scheme used in the doc store. -/// -/// This name is appended to the version string of tantivy. -pub const COMPRESSION: &str = "snappy"; - +#[inline] pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { compressed.clear(); let mut encoder = snap::write::FrameEncoder::new(compressed); @@ -13,6 +9,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> Ok(()) } +#[inline] pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { decompressed.clear(); snap::read::FrameDecoder::new(compressed).read_to_end(decompressed)?; diff --git a/src/store/compressors.rs b/src/store/compressors.rs new file mode 100644 index 000000000..d7aac07b6 --- /dev/null +++ b/src/store/compressors.rs @@ -0,0 +1,134 @@ +use serde::{Deserialize, Serialize}; +use std::io; + +pub trait StoreCompressor { + fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()>; + fn decompress(&self, compressed: &[u8], decompressed: &mut Vec) -> io::Result<()>; + fn get_compressor_id() -> u8; +} + +/// Compressor can be used on `IndexSettings` to choose +/// the compressor used to compress the doc store. +/// +/// The default is Lz4Block, but also depends on the enabled feature flags. +#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Compressor { + #[serde(rename = "lz4")] + /// Use the lz4 compressor (block format) + Lz4, + #[serde(rename = "brotli")] + /// Use the brotli compressor + Brotli, + #[serde(rename = "snappy")] + /// Use the snap compressor + Snappy, +} + +impl Default for Compressor { + fn default() -> Self { + if cfg!(feature = "lz4-compression") { + Compressor::Lz4 + } else if cfg!(feature = "brotli-compression") { + Compressor::Brotli + } else if cfg!(feature = "snappy-compression") { + Compressor::Snappy + } else { + panic!( + "all compressor feature flags like are disabled (e.g. lz4-compression), can't choose default compressor" + ); + } + } +} + +impl Compressor { + pub(crate) fn from_id(id: u8) -> Compressor { + match id { + 1 => Compressor::Lz4, + 2 => Compressor::Brotli, + 3 => Compressor::Snappy, + _ => panic!("unknown compressor id {:?}", id), + } + } + pub(crate) fn get_id(&self) -> u8 { + match self { + Self::Lz4 => 1, + Self::Brotli => 2, + Self::Snappy => 3, + } + } + #[inline] + pub(crate) fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { + match self { + Self::Lz4 => { + #[cfg(feature = "lz4-compression")] + { + super::compression_lz4_block::compress(uncompressed, compressed) + } + #[cfg(not(feature = "lz4-compression"))] + { + panic!("lz4-compression feature flag not activated"); + } + } + Self::Brotli => { + #[cfg(feature = "brotli-compression")] + { + super::compression_brotli::compress(uncompressed, compressed) + } + #[cfg(not(feature = "brotli-compression"))] + { + panic!("brotli-compression-compression feature flag not activated"); + } + } + Self::Snappy => { + #[cfg(feature = "snappy-compression")] + { + super::compression_snap::compress(uncompressed, compressed) + } + #[cfg(not(feature = "snappy-compression"))] + { + panic!("snappy-compression feature flag not activated"); + } + } + } + } + + #[inline] + pub(crate) fn decompress( + &self, + compressed: &[u8], + decompressed: &mut Vec, + ) -> io::Result<()> { + match self { + Self::Lz4 => { + #[cfg(feature = "lz4-compression")] + { + super::compression_lz4_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "lz4-compression"))] + { + panic!("lz4-compression feature flag not activated"); + } + } + Self::Brotli => { + #[cfg(feature = "brotli-compression")] + { + super::compression_brotli::decompress(compressed, decompressed) + } + #[cfg(not(feature = "brotli-compression"))] + { + panic!("brotli-compression feature flag not activated"); + } + } + Self::Snappy => { + #[cfg(feature = "snappy-compression")] + { + super::compression_snap::decompress(compressed, decompressed) + } + #[cfg(not(feature = "snappy-compression"))] + { + panic!("snappy-compression feature flag not activated"); + } + } + } + } +} diff --git a/src/store/footer.rs b/src/store/footer.rs new file mode 100644 index 000000000..1c5f2817b --- /dev/null +++ b/src/store/footer.rs @@ -0,0 +1,69 @@ +use crate::{ + common::{BinarySerializable, FixedSize, HasLen}, + directory::FileSlice, + store::Compressor, +}; +use std::io; + +#[derive(Debug, Clone, PartialEq)] +pub struct DocStoreFooter { + pub offset: u64, + pub compressor: Compressor, +} + +/// Serialises the footer to a byte-array +/// - offset : 8 bytes +///- compressor id: 1 byte +/// - reserved for future use: 15 bytes +impl BinarySerializable for DocStoreFooter { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + BinarySerializable::serialize(&self.offset, writer)?; + BinarySerializable::serialize(&self.compressor.get_id(), writer)?; + writer.write_all(&[0; 15])?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let offset = u64::deserialize(reader)?; + let compressor_id = u8::deserialize(reader)?; + let mut skip_buf = [0; 15]; + reader.read_exact(&mut skip_buf)?; + Ok(DocStoreFooter { + offset, + compressor: Compressor::from_id(compressor_id), + }) + } +} + +impl FixedSize for DocStoreFooter { + const SIZE_IN_BYTES: usize = 24; +} + +impl DocStoreFooter { + pub fn new(offset: u64, compressor: Compressor) -> Self { + DocStoreFooter { offset, compressor } + } + + pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> { + if file.len() < DocStoreFooter::SIZE_IN_BYTES { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The file is smaller than Footer::SIZE_IN_BYTES (len={}).", + file.len() + ), + )); + } + let (body, footer_slice) = file.split_from_end(DocStoreFooter::SIZE_IN_BYTES); + let mut footer_bytes = footer_slice.read_bytes()?; + let footer = DocStoreFooter::deserialize(&mut footer_bytes)?; + Ok((footer, body)) + } +} + +#[test] +fn doc_store_footer_test() { + // This test is just to safe guard changes on the footer. + // When the doc store footer is updated, make sure to update also the serialize/deserialize methods + assert_eq!(core::mem::size_of::(), 16); +} diff --git a/src/store/mod.rs b/src/store/mod.rs index 2f04dcd46..e5803f9f7 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,66 +33,23 @@ and should rely on either !*/ +mod compressors; +mod footer; mod index; mod reader; mod writer; +pub use self::compressors::Compressor; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; -// compile_error doesn't scale very well, enum like feature flags would be great to have in Rust -#[cfg(all(feature = "lz4", feature = "brotli"))] -compile_error!("feature `lz4` or `brotli` must not be enabled together."); - -#[cfg(all(feature = "lz4_block", feature = "brotli"))] -compile_error!("feature `lz4_block` or `brotli` must not be enabled together."); - -#[cfg(all(feature = "lz4_block", feature = "lz4"))] -compile_error!("feature `lz4_block` or `lz4` must not be enabled together."); - -#[cfg(all(feature = "lz4_block", feature = "snap"))] -compile_error!("feature `lz4_block` or `snap` must not be enabled together."); - -#[cfg(all(feature = "lz4", feature = "snap"))] -compile_error!("feature `lz4` or `snap` must not be enabled together."); - -#[cfg(all(feature = "brotli", feature = "snap"))] -compile_error!("feature `brotli` or `snap` must not be enabled together."); - -#[cfg(not(any( - feature = "lz4", - feature = "brotli", - feature = "lz4_flex", - feature = "snap" -)))] -compile_error!("all compressors are deactivated via feature-flags, check Cargo.toml for available decompressors."); - -#[cfg(feature = "lz4_flex")] +#[cfg(feature = "lz4-compression")] mod compression_lz4_block; -#[cfg(feature = "lz4_flex")] -pub use self::compression_lz4_block::COMPRESSION; -#[cfg(feature = "lz4_flex")] -use self::compression_lz4_block::{compress, decompress}; -#[cfg(feature = "lz4")] -mod compression_lz4; -#[cfg(feature = "lz4")] -pub use self::compression_lz4::COMPRESSION; -#[cfg(feature = "lz4")] -use self::compression_lz4::{compress, decompress}; - -#[cfg(feature = "brotli")] +#[cfg(feature = "brotli-compression")] mod compression_brotli; -#[cfg(feature = "brotli")] -pub use self::compression_brotli::COMPRESSION; -#[cfg(feature = "brotli")] -use self::compression_brotli::{compress, decompress}; -#[cfg(feature = "snap")] +#[cfg(feature = "snappy-compression")] mod compression_snap; -#[cfg(feature = "snap")] -pub use self::compression_snap::COMPRESSION; -#[cfg(feature = "snap")] -use self::compression_snap::{compress, decompress}; #[cfg(test)] pub mod tests { @@ -109,28 +66,31 @@ pub mod tests { use crate::{schema::Schema, Index}; use std::path::Path; - pub fn write_lorem_ipsum_store(writer: WritePtr, num_docs: usize) -> Schema { - let mut schema_builder = Schema::builder(); - let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); - let field_title = - schema_builder.add_text_field("title", TextOptions::default().set_stored()); - let schema = schema_builder.build(); - let lorem = String::from( - "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed \ + const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed \ do eiusmod tempor incididunt ut labore et dolore magna aliqua. \ Ut enim ad minim veniam, quis nostrud exercitation ullamco \ laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure \ dolor in reprehenderit in voluptate velit esse cillum dolore eu \ fugiat nulla pariatur. Excepteur sint occaecat cupidatat non \ proident, sunt in culpa qui officia deserunt mollit anim id est \ - laborum.", - ); + laborum."; + + pub fn write_lorem_ipsum_store( + writer: WritePtr, + num_docs: usize, + compressor: Compressor, + ) -> Schema { + let mut schema_builder = Schema::builder(); + let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); + let field_title = + schema_builder.add_text_field("title", TextOptions::default().set_stored()); + let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer); + let mut store_writer = StoreWriter::new(writer, compressor); for i in 0..num_docs { let mut fields: Vec = Vec::new(); { - let field_value = FieldValue::new(field_body, From::from(lorem.clone())); + let field_value = FieldValue::new(field_body, From::from(LOREM.to_string())); fields.push(field_value); } { @@ -147,12 +107,11 @@ pub mod tests { schema } - #[test] - fn test_store() -> crate::Result<()> { + fn test_store(compressor: Compressor) -> crate::Result<()> { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, 1_000); + let schema = write_lorem_ipsum_store(store_wrt, 1_000, compressor); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; @@ -176,6 +135,22 @@ pub mod tests { Ok(()) } + #[cfg(feature = "lz4-compression")] + #[test] + fn test_store_lz4_block() -> crate::Result<()> { + test_store(Compressor::Lz4) + } + #[cfg(feature = "snappy-compression")] + #[test] + fn test_store_snap() -> crate::Result<()> { + test_store(Compressor::Snappy) + } + #[cfg(feature = "brotli-compression")] + #[test] + fn test_store_brotli() -> crate::Result<()> { + test_store(Compressor::Brotli) + } + #[test] fn test_store_with_delete() -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); @@ -216,6 +191,67 @@ pub mod tests { } Ok(()) } + + #[cfg(feature = "snappy-compression")] + #[cfg(feature = "lz4-compression")] + #[test] + fn test_merge_with_changed_compressor() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + + let text_field = schema_builder.add_text_field("text_field", TEXT | STORED); + let schema = schema_builder.build(); + let index_builder = Index::builder().schema(schema); + + let mut index = index_builder.create_in_ram().unwrap(); + index.settings_mut().docstore_compression = Compressor::Lz4; + { + let mut index_writer = index.writer_for_tests().unwrap(); + // put enough data create enough blocks in the doc store to be considered for stacking + for _ in 0..200 { + index_writer.add_document(doc!(text_field=> LOREM)); + } + assert!(index_writer.commit().is_ok()); + for _ in 0..200 { + index_writer.add_document(doc!(text_field=> LOREM)); + } + assert!(index_writer.commit().is_ok()); + } + assert_eq!( + index.reader().unwrap().searcher().segment_readers()[0] + .get_store_reader() + .unwrap() + .compressor(), + Compressor::Lz4 + ); + // Change compressor, this disables stacking on merging + let index_settings = index.settings_mut(); + index_settings.docstore_compression = Compressor::Snappy; + // Merging the segments + { + let segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + let mut index_writer = index.writer_for_tests().unwrap(); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.wait_merging_threads().is_ok()); + } + + let searcher = index.reader().unwrap().searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + let reader = searcher.segment_readers().iter().last().unwrap(); + let store = reader.get_store_reader().unwrap(); + + for doc in store.iter(reader.delete_bitset()).take(50) { + assert_eq!( + *doc?.get_first(text_field).unwrap().text().unwrap(), + LOREM.to_string() + ); + } + assert_eq!(store.compressor(), Compressor::Snappy); + + Ok(()) + } + #[test] fn test_merge_of_small_segments() -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); @@ -265,6 +301,7 @@ mod bench { use super::tests::write_lorem_ipsum_store; use crate::directory::Directory; use crate::directory::RamDirectory; + use crate::store::Compressor; use crate::store::StoreReader; use std::path::Path; use test::Bencher; @@ -275,7 +312,11 @@ mod bench { let directory = RamDirectory::create(); let path = Path::new("store"); b.iter(|| { - write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000); + write_lorem_ipsum_store( + directory.open_write(path).unwrap(), + 1_000, + Compressor::default(), + ); directory.delete(path).unwrap(); }); } @@ -284,11 +325,13 @@ mod bench { fn bench_store_decode(b: &mut Bencher) { let directory = RamDirectory::create(); let path = Path::new("store"); - write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000); + write_lorem_ipsum_store( + directory.open_write(path).unwrap(), + 1_000, + Compressor::default(), + ); let store_file = directory.open_read(path).unwrap(); let store = StoreReader::open(store_file).unwrap(); - b.iter(|| { - store.get(12).unwrap(); - }); + b.iter(|| store.iter(None).collect::>()); } } diff --git a/src/store/reader.rs b/src/store/reader.rs index 8cddc7150..810c5c5b8 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,18 +1,17 @@ -use super::decompress; -use super::index::SkipIndex; +use super::Compressor; +use super::{footer::DocStoreFooter, index::SkipIndex}; use crate::directory::{FileSlice, OwnedBytes}; use crate::schema::Document; use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -use crate::{common::VInt, fastfield::DeleteBitSet}; use crate::{ - common::{BinarySerializable, HasLen}, + common::{BinarySerializable, HasLen, VInt}, error::DataCorruption, + fastfield::DeleteBitSet, }; use lru::LruCache; use std::io; -use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -24,6 +23,7 @@ type BlockCache = Arc>>; /// Reads document off tantivy's [`Store`](./index.html) pub struct StoreReader { + compressor: Compressor, data: FileSlice, cache: BlockCache, cache_hits: Arc, @@ -35,11 +35,14 @@ pub struct StoreReader { impl StoreReader { /// Opens a store reader pub fn open(store_file: FileSlice) -> io::Result { - let (data_file, offset_index_file) = split_file(store_file)?; + let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; + + let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); let index_data = offset_index_file.read_bytes()?; let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len()); let skip_index = SkipIndex::open(index_data); Ok(StoreReader { + compressor: footer.compressor, data: data_file, cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))), cache_hits: Default::default(), @@ -53,6 +56,10 @@ impl StoreReader { self.skip_index.checkpoints() } + pub(crate) fn compressor(&self) -> Compressor { + self.compressor + } + fn block_checkpoint(&self, doc_id: DocId) -> Option { self.skip_index.seek(doc_id) } @@ -75,7 +82,8 @@ impl StoreReader { let compressed_block = self.compressed_block(checkpoint)?; let mut decompressed_block = vec![]; - decompress(compressed_block.as_slice(), &mut decompressed_block)?; + self.compressor + .decompress(compressed_block.as_slice(), &mut decompressed_block)?; let block = OwnedBytes::new(decompressed_block); self.cache @@ -230,14 +238,6 @@ impl StoreReader { } } -fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> { - let (data, footer_len_bytes) = data.split_from_end(size_of::()); - let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?; - let mut serialized_offset_buf = serialized_offset.as_slice(); - let offset = u64::deserialize(&mut serialized_offset_buf)? as usize; - Ok(data.split(offset)) -} - #[cfg(test)] mod tests { use super::*; @@ -255,7 +255,7 @@ mod tests { let directory = RamDirectory::create(); let path = Path::new("store"); let writer = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(writer, 500); + let schema = write_lorem_ipsum_store(writer, 500, Compressor::default()); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; diff --git a/src/store/writer.rs b/src/store/writer.rs index 0c261f88c..93adba7eb 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,6 @@ -use super::compress; use super::index::SkipIndexBuilder; use super::StoreReader; +use super::{compressors::Compressor, footer::DocStoreFooter}; use crate::common::CountingWriter; use crate::common::{BinarySerializable, VInt}; use crate::directory::TerminatingWrite; @@ -21,6 +21,7 @@ const BLOCK_SIZE: usize = 16_384; /// The skip list index on the other hand, is built in memory. /// pub struct StoreWriter { + compressor: Compressor, doc: DocId, first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, @@ -34,8 +35,9 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr) -> StoreWriter { + pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter { StoreWriter { + compressor, doc: 0, first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), @@ -45,6 +47,10 @@ impl StoreWriter { } } + pub(crate) fn compressor(&self) -> Compressor { + self.compressor + } + /// The memory used (inclusive childs) pub fn mem_usage(&self) -> usize { self.intermediary_buffer.capacity() + self.current_block.capacity() @@ -125,7 +131,8 @@ impl StoreWriter { fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); - compress(&self.current_block[..], &mut self.intermediary_buffer)?; + self.compressor + .compress(&self.current_block[..], &mut self.intermediary_buffer)?; let start_offset = self.writer.written_bytes() as usize; self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes() as usize; @@ -147,8 +154,9 @@ impl StoreWriter { self.write_and_compress_block()?; } let header_offset: u64 = self.writer.written_bytes() as u64; + let footer = DocStoreFooter::new(header_offset, self.compressor); self.offset_index_writer.write(&mut self.writer)?; - header_offset.serialize(&mut self.writer)?; + footer.serialize(&mut self.writer)?; self.writer.terminate() } }