Compare commits

...

10 Commits

Author SHA1 Message Date
PSeitz
7ce8a65619 fix: doc store for files larger 4GB (#1856)
Fixes an issue in the skip list deserialization, which deserialized the byte start offset incorrectly as u32.
`get_doc` will fail for any docs that live in a block with start offset larger than u32::MAX (~4GB).
Causes index corruption, if a segment with a doc store larger 4GB is merged.

tantivy version 0.19 is affected
2023-03-13 15:07:55 +09:00
PSeitz
7bf0a14041 fix: auto downgrade index record option, instead of vint error (#1857)
Prev: thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: IoError(Custom { kind: InvalidData, error: "Reach end of buffer while reading VInt" })', src/main.rs:46:14
Now: Automatic downgrade to next available level
2023-03-13 15:07:28 +09:00
PSeitz
c91d4e4e65 fix sort order test for term aggregation (#1858)
fix sort order test for term aggregation
fix invalid request test
2023-03-13 13:49:10 +08:00
PSeitz
6f6f639170 fmt code, update lz4_flex (#1838)
formatting on nightly changed
2023-03-13 14:14:15 +09:00
Paul Masurel
a022e97dc2 Bumped tantivy version 2023-03-13 14:10:41 +09:00
Paul Masurel
6474a0f58e Created branch specifically for Quickwit 0.5 2023-03-11 12:27:20 +09:00
PSeitz
0f20787917 fix doc store cache docs (#1821)
* fix doc store cache docs

addresses an issue reported in #1820

* rename doc_store_cache_size
2023-01-23 07:06:49 +01:00
Paul Masurel
2874554ee4 Removed the sorting logic that forced column type to be sorted like (#1816)
* Removed the sorting logic that forced column type to be sorted like
ColumnTypes.

* add comments

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2023-01-20 12:43:28 +01:00
PSeitz
cbc70a9eae Cargo.toml cleanup (#1817) 2023-01-20 12:30:35 +01:00
PSeitz
226d0f88bc add columnar to workspace (#1808) 2023-01-20 11:47:10 +01:00
21 changed files with 217 additions and 303 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy" name = "tantivy"
version = "0.19.0" version = "0.19.1-quickwit"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]
@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
aho-corasick = "0.7" aho-corasick = "0.7"
tantivy-fst = "0.4.0" tantivy-fst = "0.4.0"
memmap2 = { version = "0.5.3", optional = true } memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true } lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true } brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.12", optional = true, default-features = false } zstd = { version = "0.12", optional = true, default-features = false }
snap = { version = "1.0.5", optional = true } snap = { version = "1.0.5", optional = true }
@@ -55,6 +55,7 @@ measure_time = "0.8.2"
async-trait = "0.1.53" async-trait = "0.1.53"
arc-swap = "1.5.0" arc-swap = "1.5.0"
#columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true } sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" } stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" } tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
@@ -107,7 +108,7 @@ unstable = [] # useful for benches.
quickwit = ["sstable"] quickwit = ["sstable"]
[workspace] [workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api"] members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
# Following the "fail" crate best practises, we isolate # Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points # tests that define specific behavior in fail check points

View File

@@ -5,28 +5,23 @@ edition = "2021"
license = "MIT" license = "MIT"
[dependencies] [dependencies]
itertools = "0.10.5"
log = "0.4.17"
fnv = "1.0.7"
fastdivide = "0.4.0"
rand = { version = "0.8.5", optional = true }
measure_time = { version = "0.8.2", optional = true }
prettytable-rs = { version = "0.10.0", optional = true }
stacker = { path = "../stacker", package="tantivy-stacker"} stacker = { path = "../stacker", package="tantivy-stacker"}
serde_json = "1"
thiserror = "1"
fnv = "1"
sstable = { path = "../sstable", package = "tantivy-sstable" } sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" } common = { path = "../common", package = "tantivy-common" }
itertools = "0.10"
log = "0.4"
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" } tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
prettytable-rs = {version="0.10.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
measure_time = { version="0.8.2", optional=true}
[dev-dependencies] [dev-dependencies]
proptest = "1" proptest = "1.0.0"
more-asserts = "0.3.0" more-asserts = "0.3.1"
rand = "0.8.3" rand = "0.8.5"
# temporary
[workspace]
members = []
[features] [features]
unstable = [] unstable = []

View File

@@ -1,7 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES; use crate::column_index::optional_index::set_block::{
use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec}; DenseBlockCodec, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
};
use crate::column_index::optional_index::{Set, SetCodec}; use crate::column_index::optional_index::{Set, SetCodec};
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize { fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {

View File

@@ -3,24 +3,22 @@ use std::net::Ipv6Addr;
use crate::value::NumericalType; use crate::value::NumericalType;
use crate::InvalidData; use crate::InvalidData;
/// The column type represents the column type and can fit on 6-bits. /// The column type represents the column type.
/// /// Any changes need to be propagated to `COLUMN_TYPES`.
/// - bits[0..3]: Column category type. #[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
/// - bits[3..6]: Numerical type if necessary.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
#[repr(u8)] #[repr(u8)]
pub enum ColumnType { pub enum ColumnType {
I64 = 0u8, I64 = 0u8,
U64 = 1u8, U64 = 1u8,
F64 = 2u8, F64 = 2u8,
Bytes = 10u8, Bytes = 3u8,
Str = 14u8, Str = 4u8,
Bool = 18u8, Bool = 5u8,
IpAddr = 22u8, IpAddr = 6u8,
DateTime = 26u8, DateTime = 7u8,
} }
#[cfg(test)] // The order needs to match _exactly_ the order in the enum
const COLUMN_TYPES: [ColumnType; 8] = [ const COLUMN_TYPES: [ColumnType; 8] = [
ColumnType::I64, ColumnType::I64,
ColumnType::U64, ColumnType::U64,
@@ -38,18 +36,7 @@ impl ColumnType {
} }
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> { pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
use ColumnType::*; COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
match code {
0u8 => Ok(I64),
1u8 => Ok(U64),
2u8 => Ok(F64),
10u8 => Ok(Bytes),
14u8 => Ok(Str),
18u8 => Ok(Bool),
22u8 => Ok(IpAddr),
26u8 => Ok(Self::DateTime),
_ => Err(InvalidData),
}
} }
} }
@@ -64,18 +51,6 @@ impl From<NumericalType> for ColumnType {
} }
impl ColumnType { impl ColumnType {
/// get column type category
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
match self {
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
pub fn numerical_type(&self) -> Option<NumericalType> { pub fn numerical_type(&self) -> Option<NumericalType> {
match self { match self {
ColumnType::I64 => Some(NumericalType::I64), ColumnType::I64 => Some(NumericalType::I64),
@@ -154,70 +129,20 @@ impl HasAssociatedColumnType for Ipv6Addr {
} }
} }
/// Column types are grouped into different categories that
/// corresponds to the different types of `JsonValue` types.
///
/// The columnar writer will apply coercion rules to make sure that
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::collections::HashSet;
use super::*; use super::*;
use crate::Cardinality; use crate::Cardinality;
#[test] #[test]
fn test_column_type_to_code() { fn test_column_type_to_code() {
let mut column_type_set: HashSet<ColumnType> = HashSet::new(); for (code, expected_column_type) in super::COLUMN_TYPES.iter().copied().enumerate() {
for code in u8::MIN..=u8::MAX { if let Ok(column_type) = ColumnType::try_from_code(code as u8) {
if let Ok(column_type) = ColumnType::try_from_code(code) { assert_eq!(column_type, expected_column_type);
assert_eq!(column_type.to_code(), code);
assert!(column_type_set.insert(column_type));
} }
} }
assert_eq!(column_type_set.len(), super::COLUMN_TYPES.len()); for code in COLUMN_TYPES.len() as u8..=u8::MAX {
} assert!(ColumnType::try_from_code(code as u8).is_err());
#[test]
fn test_column_category_sort_consistent_with_column_type_sort() {
// This is a very important property because we
// we need to serialize colunmn in the right order.
let mut column_types: Vec<ColumnType> = super::COLUMN_TYPES.iter().copied().collect();
column_types.sort_by_key(|col| col.to_code());
let column_categories: Vec<ColumnTypeCategory> = column_types
.into_iter()
.map(ColumnTypeCategory::from)
.collect();
for (prev, next) in column_categories.iter().zip(column_categories.iter()) {
assert!(prev <= next);
} }
} }

View File

@@ -1,9 +1,9 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use super::column_type::ColumnTypeCategory;
use crate::columnar::ColumnarReader; use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn; use crate::dynamic_column::DynamicColumn;
use crate::ColumnType;
pub enum MergeDocOrder { pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other. /// Columnar tables are simply stacked one above the other.
@@ -35,7 +35,40 @@ pub fn merge_columnar(
} }
} }
pub fn collect_columns( /// Column types are grouped into different categories.
/// After merge, all columns belonging to the same category are coerced to
/// the same column type.
///
/// In practise, today, only Numerical colummns are coerced into one type today.
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
fn collect_columns(
columnar_readers: &[&ColumnarReader], columnar_readers: &[&ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> { ) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated. // Each column name may have multiple types of column associated.
@@ -51,7 +84,7 @@ pub fn collect_columns(
.or_default(); .or_default();
let columns = column_type_to_handles let columns = column_type_to_handles
.entry(handle.column_type().column_type_category()) .entry(handle.column_type().into())
.or_default(); .or_default();
columns.push(handle.open()?); columns.push(handle.open()?);
} }
@@ -62,10 +95,9 @@ pub fn collect_columns(
Ok(field_name_to_group) Ok(field_name_to_group)
} }
/// Cast numerical type columns to the same type /// Coerce numerical type columns to the same type
pub(crate) fn normalize_columns( /// TODO rename to `coerce_columns`
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>, fn normalize_columns(map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>) {
) {
for (_field_name, type_category_to_columns) in map.iter_mut() { for (_field_name, type_category_to_columns) in map.iter_mut() {
for (type_category, columns) in type_category_to_columns { for (type_category, columns) in type_category_to_columns {
if type_category == &ColumnTypeCategory::Numerical { if type_category == &ColumnTypeCategory::Numerical {

View File

@@ -184,10 +184,12 @@ impl CompatibleNumericalTypes {
} }
impl NumericalColumnWriter { impl NumericalColumnWriter {
pub fn column_type_and_cardinality(&self, num_docs: RowId) -> (NumericalType, Cardinality) { pub fn numerical_type(&self) -> NumericalType {
let numerical_type = self.compatible_numerical_types.to_numerical_type(); self.compatible_numerical_types.to_numerical_type()
let cardinality = self.column_writer.get_cardinality(num_docs); }
(numerical_type, cardinality)
pub fn cardinality(&self, num_docs: RowId) -> Cardinality {
self.column_writer.get_cardinality(num_docs)
} }
pub fn record_numerical_value( pub fn record_numerical_value(

View File

@@ -15,7 +15,7 @@ use crate::column_index::SerializableColumnIndex;
use crate::column_values::{ use crate::column_values::{
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn, ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
}; };
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory}; use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{ use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter, ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
}; };
@@ -276,35 +276,40 @@ impl ColumnarWriter {
} }
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> { pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt); let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map .numerical_field_hash_map
.iter() .iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Numerical, addr)) .map(|(column_name, addr, _)| {
let numerical_column_writer: NumericalColumnWriter =
self.numerical_field_hash_map.read(addr);
let column_type = numerical_column_writer.numerical_type().into();
(column_name, column_type, addr)
})
.collect(); .collect();
columns.extend( columns.extend(
self.bytes_field_hash_map self.bytes_field_hash_map
.iter() .iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)), .map(|(term, addr, _)| (term, ColumnType::Bytes, addr)),
); );
columns.extend( columns.extend(
self.str_field_hash_map self.str_field_hash_map
.iter() .iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Str, addr)), .map(|(column_name, addr, _)| (column_name, ColumnType::Str, addr)),
); );
columns.extend( columns.extend(
self.bool_field_hash_map self.bool_field_hash_map
.iter() .iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Bool, addr)), .map(|(column_name, addr, _)| (column_name, ColumnType::Bool, addr)),
); );
columns.extend( columns.extend(
self.ip_addr_field_hash_map self.ip_addr_field_hash_map
.iter() .iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::IpAddr, addr)), .map(|(column_name, addr, _)| (column_name, ColumnType::IpAddr, addr)),
); );
columns.extend( columns.extend(
self.datetime_field_hash_map self.datetime_field_hash_map
.iter() .iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::DateTime, addr)), .map(|(column_name, addr, _)| (column_name, ColumnType::DateTime, addr)),
); );
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type)); columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
@@ -312,8 +317,12 @@ impl ColumnarWriter {
let mut symbol_byte_buffer: Vec<u8> = Vec::new(); let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns { for (column_name, column_type, addr) in columns {
match column_type { match column_type {
ColumnTypeCategory::Bool => { ColumnType::Bool | ColumnType::DateTime => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); let column_writer: ColumnWriter = if column_type == ColumnType::Bool {
self.bool_field_hash_map.read(addr)
} else {
self.datetime_field_hash_map.read(addr)
};
let cardinality = column_writer.get_cardinality(num_docs); let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer = let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::Bool); serializer.serialize_column(column_name, ColumnType::Bool);
@@ -325,7 +334,7 @@ impl ColumnarWriter {
&mut column_serializer, &mut column_serializer,
)?; )?;
} }
ColumnTypeCategory::IpAddr => { ColumnType::IpAddr => {
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr); let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs); let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer = let mut column_serializer =
@@ -338,32 +347,35 @@ impl ColumnarWriter {
&mut column_serializer, &mut column_serializer,
)?; )?;
} }
ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => { ColumnType::Bytes | ColumnType::Str => {
let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) = let str_or_bytes_column_writer: StrOrBytesColumnWriter =
if column_type == ColumnTypeCategory::Bytes { if column_type == ColumnType::Bytes {
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr)) self.bytes_field_hash_map.read(addr)
} else { } else {
(ColumnType::Str, self.str_field_hash_map.read(addr)) self.str_field_hash_map.read(addr)
}; };
let dictionary_builder = let dictionary_builder =
&dictionaries[str_column_writer.dictionary_id as usize]; &dictionaries[str_or_bytes_column_writer.dictionary_id as usize];
let cardinality = str_column_writer.column_writer.get_cardinality(num_docs); let cardinality = str_or_bytes_column_writer
.column_writer
.get_cardinality(num_docs);
let mut column_serializer = let mut column_serializer =
serializer.serialize_column(column_name, column_type); serializer.serialize_column(column_name, column_type);
serialize_bytes_or_str_column( serialize_bytes_or_str_column(
cardinality, cardinality,
num_docs, num_docs,
dictionary_builder, dictionary_builder,
str_column_writer.operation_iterator(arena, &mut symbol_byte_buffer), str_or_bytes_column_writer
.operation_iterator(arena, &mut symbol_byte_buffer),
buffers, buffers,
&mut column_serializer, &mut column_serializer,
)?; )?;
} }
ColumnTypeCategory::Numerical => { ColumnType::I64 | ColumnType::F64 | ColumnType::U64 => {
let numerical_column_writer: NumericalColumnWriter = let numerical_column_writer: NumericalColumnWriter =
self.numerical_field_hash_map.read(addr); self.numerical_field_hash_map.read(addr);
let (numerical_type, cardinality) = let numerical_type = column_type.numerical_type().unwrap();
numerical_column_writer.column_type_and_cardinality(num_docs); let cardinality = numerical_column_writer.cardinality(num_docs);
let mut column_serializer = let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::from(numerical_type)); serializer.serialize_column(column_name, ColumnType::from(numerical_type));
serialize_numerical_column( serialize_numerical_column(
@@ -375,20 +387,6 @@ impl ColumnarWriter {
&mut column_serializer, &mut column_serializer,
)?; )?;
} }
ColumnTypeCategory::DateTime => {
let column_writer: ColumnWriter = self.datetime_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::DateTime);
serialize_numerical_column(
cardinality,
num_docs,
NumericalType::I64,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
}
}; };
} }
serializer.finalize()?; serializer.finalize()?;

View File

@@ -362,13 +362,19 @@ impl SegmentTermCollector {
let mut entries: Vec<(u32, TermBucketEntry)> = let mut entries: Vec<(u32, TermBucketEntry)> =
self.term_buckets.entries.into_iter().collect(); self.term_buckets.entries.into_iter().collect();
let order_by_key = self.req.order.target == OrderTarget::Key;
let order_by_sub_aggregation = let order_by_sub_aggregation =
matches!(self.req.order.target, OrderTarget::SubAggregation(_)); matches!(self.req.order.target, OrderTarget::SubAggregation(_));
match self.req.order.target { match self.req.order.target {
OrderTarget::Key => { OrderTarget::Key => {
// defer order and cut_off after loading the texts from the dictionary // We rely on the fact, that term ordinals match the order of the strings
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if self.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
}
} }
OrderTarget::SubAggregation(_name) => { OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the // don't sort and cut off since it's hard to make assumptions on the quality of the
@@ -384,12 +390,11 @@ impl SegmentTermCollector {
} }
} }
let (term_doc_count_before_cutoff, mut sum_other_doc_count) = let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
if order_by_key || order_by_sub_aggregation { (0, 0)
(0, 0) } else {
} else { cut_off_buckets(&mut entries, self.req.segment_size as usize)
cut_off_buckets(&mut entries, self.req.segment_size as usize) };
};
let inverted_index = agg_with_accessor let inverted_index = agg_with_accessor
.inverted_index .inverted_index
@@ -412,6 +417,10 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 { if self.req.min_doc_count == 0 {
let mut stream = term_dict.stream()?; let mut stream = term_dict.stream()?;
while let Some((key, _ord)) = stream.next() { while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
}
let key = std::str::from_utf8(key) let key = std::str::from_utf8(key)
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?; .map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
if !dict.contains_key(key) { if !dict.contains_key(key) {
@@ -420,20 +429,6 @@ impl SegmentTermCollector {
} }
} }
if order_by_key {
let mut dict_entries = dict.into_iter().collect_vec();
if self.req.order.order == Order::Desc {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
} else {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
}
let (_, sum_other_docs) =
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
Ok(IntermediateBucketResult::Terms( Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult { IntermediateTermBucketResult {
entries: dict, entries: dict,
@@ -923,14 +918,14 @@ mod tests {
]; ];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?; let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// key desc // key asc
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
..Default::default() ..Default::default()
@@ -957,7 +952,7 @@ mod tests {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -981,14 +976,14 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3); assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
// key desc and segment_size cut_off // key asc and segment_size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1011,14 +1006,14 @@ mod tests {
serde_json::Value::Null serde_json::Value::Null
); );
// key asc // key desc
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
..Default::default() ..Default::default()
@@ -1038,14 +1033,14 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5); assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
// key asc, size cut_off // key desc, size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1068,14 +1063,14 @@ mod tests {
); );
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5); assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
// key asc, segment_size cut_off // key desc, segment_size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1352,68 +1347,3 @@ mod tests {
Ok(()) Ok(())
} }
} }
#[cfg(all(test, feature = "unstable"))]
mod bench {
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
use super::*;
fn get_collector_with_buckets(num_docs: u64) -> TermBuckets {
TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap()
}
fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec<u64> {
let mut rng = thread_rng();
let all_terms = (0..total_terms - 1).collect_vec();
let mut vals = vec![];
for _ in 0..num_terms_returned {
let val = all_terms.as_slice().choose(&mut rng).unwrap();
vals.push(*val);
}
vals
}
fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
let bucket_count: BucketCount = BucketCount {
bucket_count: Default::default(),
max_bucket_count: 1_000_001u32,
};
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
.unwrap();
}
})
}
#[bench]
fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 500u64, 1_000_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 1_000_000u64)
}
}

View File

@@ -499,7 +499,7 @@ impl IntermediateTermBucketResult {
match req.order.target { match req.order.target {
OrderTarget::Key => { OrderTarget::Key => {
buckets.sort_by(|left, right| { buckets.sort_by(|left, right| {
if req.order.order == Order::Desc { if req.order.order == Order::Asc {
left.key.partial_cmp(&right.key) left.key.partial_cmp(&right.key)
} else { } else {
right.key.partial_cmp(&left.key) right.key.partial_cmp(&left.key)

View File

@@ -1156,12 +1156,6 @@ mod tests {
r#"FieldNotFound("not_exist_field")"# r#"FieldNotFound("not_exist_field")"#
); );
let agg_res = avg_on_field("scores_i64");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
);
Ok(()) Ok(())
} }

View File

@@ -135,6 +135,8 @@ impl InvertedIndexReader {
term_info: &TermInfo, term_info: &TermInfo,
option: IndexRecordOption, option: IndexRecordOption,
) -> io::Result<SegmentPostings> { ) -> io::Result<SegmentPostings> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = { let position_reader = {
if option.has_positions() { if option.has_positions() {

View File

@@ -249,7 +249,7 @@ impl SearcherInner {
index: Index, index: Index,
segment_readers: Vec<SegmentReader>, segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>, generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize, doc_store_cache_num_blocks: usize,
) -> io::Result<SearcherInner> { ) -> io::Result<SearcherInner> {
assert_eq!( assert_eq!(
&segment_readers &segment_readers
@@ -261,7 +261,7 @@ impl SearcherInner {
); );
let store_readers: Vec<StoreReader> = segment_readers let store_readers: Vec<StoreReader> = segment_readers
.iter() .iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size)) .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
.collect::<io::Result<Vec<_>>>()?; .collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner { Ok(SearcherInner {

View File

@@ -134,9 +134,12 @@ impl SegmentReader {
&self.fieldnorm_readers &self.fieldnorm_readers
} }
/// Accessor to the segment's `StoreReader`. /// Accessor to the segment's [`StoreReader`](crate::store::StoreReader).
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> { ///
StoreReader::open(self.store_file.clone(), cache_size) /// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_num_blocks)
} }
/// Open a new segment for reading. /// Open a new segment for reading.

View File

@@ -834,20 +834,23 @@ mod tests {
// This is a bit of a contrived example. // This is a bit of a contrived example.
let tokens = PreTokenizedString { let tokens = PreTokenizedString {
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life. text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
tokens: vec![Token { // Not the last token, yet ends after the last token. tokens: vec![
offset_from: 0, Token {
offset_to: 14, // Not the last token, yet ends after the last token.
position: 0, offset_from: 0,
text: "long_token".to_string(), offset_to: 14,
position_length: 3, position: 0,
}, text: "long_token".to_string(),
Token { position_length: 3,
offset_from: 0, },
offset_to: 14, Token {
position: 1, offset_from: 0,
text: "short".to_string(), offset_to: 14,
position_length: 1, position: 1,
}], text: "short".to_string(),
position_length: 1,
},
],
}; };
doc.add_pre_tokenized_text(text, tokens); doc.add_pre_tokenized_text(text, tokens);
doc.add_text(text, "hello"); doc.add_text(text, "hello");

View File

@@ -109,6 +109,7 @@ impl TermQuery {
} else { } else {
IndexRecordOption::Basic IndexRecordOption::Basic
}; };
Ok(TermWeight::new( Ok(TermWeight::new(
self.term.clone(), self.term.clone(),
index_record_option, index_record_option,

View File

@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
index: Index, index: Index,
warmers: Vec<Weak<dyn Warmer>>, warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize, num_warming_threads: usize,
doc_store_cache_size: usize, doc_store_cache_num_blocks: usize,
} }
impl IndexReaderBuilder { impl IndexReaderBuilder {
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
index, index,
warmers: Vec::new(), warmers: Vec::new(),
num_warming_threads: 1, num_warming_threads: 1,
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY, doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
} }
} }
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
searcher_generation_inventory.clone(), searcher_generation_inventory.clone(),
)?; )?;
let inner_reader = InnerIndexReader::new( let inner_reader = InnerIndexReader::new(
self.doc_store_cache_size, self.doc_store_cache_num_blocks,
self.index, self.index,
warming_state, warming_state,
searcher_generation_inventory, searcher_generation_inventory,
@@ -119,8 +119,11 @@ impl IndexReaderBuilder {
/// ///
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks. /// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
#[must_use] #[must_use]
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder { pub fn doc_store_cache_num_blocks(
self.doc_store_cache_size = doc_store_cache_size; mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self self
} }
@@ -151,7 +154,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
} }
struct InnerIndexReader { struct InnerIndexReader {
doc_store_cache_size: usize, doc_store_cache_num_blocks: usize,
index: Index, index: Index,
warming_state: WarmingState, warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>, searcher: arc_swap::ArcSwap<SearcherInner>,
@@ -161,7 +164,7 @@ struct InnerIndexReader {
impl InnerIndexReader { impl InnerIndexReader {
fn new( fn new(
doc_store_cache_size: usize, doc_store_cache_num_blocks: usize,
index: Index, index: Index,
warming_state: WarmingState, warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the // The searcher_generation_inventory is not used as source, but as target to track the
@@ -172,13 +175,13 @@ impl InnerIndexReader {
let searcher = Self::create_searcher( let searcher = Self::create_searcher(
&index, &index,
doc_store_cache_size, doc_store_cache_num_blocks,
&warming_state, &warming_state,
&searcher_generation_counter, &searcher_generation_counter,
&searcher_generation_inventory, &searcher_generation_inventory,
)?; )?;
Ok(InnerIndexReader { Ok(InnerIndexReader {
doc_store_cache_size, doc_store_cache_num_blocks,
index, index,
warming_state, warming_state,
searcher: ArcSwap::from(searcher), searcher: ArcSwap::from(searcher),
@@ -214,7 +217,7 @@ impl InnerIndexReader {
fn create_searcher( fn create_searcher(
index: &Index, index: &Index,
doc_store_cache_size: usize, doc_store_cache_num_blocks: usize,
warming_state: &WarmingState, warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>, searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>, searcher_generation_inventory: &Inventory<SearcherGeneration>,
@@ -232,7 +235,7 @@ impl InnerIndexReader {
index.clone(), index.clone(),
segment_readers, segment_readers,
searcher_generation, searcher_generation,
doc_store_cache_size, doc_store_cache_num_blocks,
)?); )?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?; warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
@@ -242,7 +245,7 @@ impl InnerIndexReader {
fn reload(&self) -> crate::Result<()> { fn reload(&self) -> crate::Result<()> {
let searcher = Self::create_searcher( let searcher = Self::create_searcher(
&self.index, &self.index,
self.doc_store_cache_size, self.doc_store_cache_num_blocks,
&self.warming_state, &self.warming_state,
&self.searcher_generation_counter, &self.searcher_generation_counter,
&self.searcher_generation_inventory, &self.searcher_generation_inventory,

View File

@@ -49,4 +49,17 @@ impl IndexRecordOption {
IndexRecordOption::WithFreqsAndPositions => true, IndexRecordOption::WithFreqsAndPositions => true,
} }
} }
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
use IndexRecordOption::*;
match (other, self) {
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
(WithFreqs, WithFreqs) => WithFreqs,
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
_ => Basic,
}
}
} }

View File

@@ -375,7 +375,8 @@ where B: AsRef<[u8]>
/// ///
/// Do NOT rely on this byte representation in the index. /// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future. /// This value is likely to change in the future.
pub(crate) fn as_slice(&self) -> &[u8] { #[inline(always)]
pub fn as_slice(&self) -> &[u8] {
self.0.as_ref() self.0.as_ref()
} }
} }

View File

@@ -90,7 +90,7 @@ impl CheckpointBlock {
return Ok(()); return Ok(());
} }
let mut doc = read_u32_vint(data); let mut doc = read_u32_vint(data);
let mut start_offset = read_u32_vint(data) as usize; let mut start_offset = VInt::deserialize_u64(data)? as usize;
for _ in 0..len { for _ in 0..len {
let num_docs = read_u32_vint(data); let num_docs = read_u32_vint(data);
let block_num_bytes = read_u32_vint(data) as usize; let block_num_bytes = read_u32_vint(data) as usize;
@@ -147,6 +147,15 @@ mod tests {
test_aux_ser_deser(&checkpoints) test_aux_ser_deser(&checkpoints)
} }
#[test]
fn test_block_serialize_large_byte_range() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
doc_range: 10..12,
byte_range: 8_000_000_000..9_000_000_000,
}];
test_aux_ser_deser(&checkpoints)
}
#[test] #[test]
fn test_block_serialize() -> io::Result<()> { fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect(); let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();

View File

@@ -4,8 +4,8 @@
//! order to be handled in the `Store`. //! order to be handled in the `Store`.
//! //!
//! Internally, documents (or rather their stored fields) are serialized to a buffer. //! Internally, documents (or rather their stored fields) are serialized to a buffer.
//! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy` //! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`,
//! and the resulting block is written to disk. //! `LZ4` or `snappy` and the resulting block is written to disk.
//! //!
//! One can then request for a specific `DocId`. //! One can then request for a specific `DocId`.
//! A skip list helps navigating to the right block, //! A skip list helps navigating to the right block,
@@ -28,8 +28,6 @@
//! - at the segment level, the //! - at the segment level, the
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc) //! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method //! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
//!
//! !
mod compressors; mod compressors;
mod decompressors; mod decompressors;

View File

@@ -114,7 +114,10 @@ impl Sum for CacheStats {
impl StoreReader { impl StoreReader {
/// Opens a store reader /// Opens a store reader
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> { ///
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
@@ -125,8 +128,8 @@ impl StoreReader {
decompressor: footer.decompressor, decompressor: footer.decompressor,
data: data_file, data: data_file,
cache: BlockCache { cache: BlockCache {
cache: NonZeroUsize::new(cache_size) cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_size| Mutex::new(LruCache::new(cache_size))), .map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(), cache_hits: Default::default(),
cache_misses: Default::default(), cache_misses: Default::default(),
}, },